giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject git commit: GIRAPH-504: Create PartitionContext (majakabiljo)
Date Sat, 09 Feb 2013 00:11:22 GMT
Updated Branches:
  refs/heads/trunk 86c2f657f -> a6cb05bcb


GIRAPH-504: Create PartitionContext (majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/a6cb05bc
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/a6cb05bc
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/a6cb05bc

Branch: refs/heads/trunk
Commit: a6cb05bcb4f0fcbf7477297f15237d3536b6d658
Parents: 86c2f65
Author: Maja Kabiljo <majakabiljo@maja-mbp.thefacebook.com>
Authored: Fri Feb 8 16:07:10 2013 -0800
Committer: Maja Kabiljo <majakabiljo@maja-mbp.thefacebook.com>
Committed: Fri Feb 8 16:09:03 2013 -0800

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../java/org/apache/giraph/conf/GiraphClasses.java |   36 +++++
 .../apache/giraph/conf/GiraphConfiguration.java    |   13 ++
 .../org/apache/giraph/conf/GiraphConstants.java    |    2 +
 .../conf/ImmutableClassesGiraphConfiguration.java  |   19 +++
 .../examples/PartitionContextTestVertex.java       |  115 +++++++++++++++
 .../org/apache/giraph/graph/ComputeCallable.java   |   15 ++
 .../java/org/apache/giraph/graph/GraphState.java   |   11 ++
 .../apache/giraph/partition/BasicPartition.java    |  106 +++++++++++++
 .../giraph/partition/ByteArrayPartition.java       |   64 ++-------
 .../giraph/partition/DefaultPartitionContext.java  |   34 +++++
 .../org/apache/giraph/partition/Partition.java     |   12 ++
 .../apache/giraph/partition/PartitionContext.java  |   45 ++++++
 .../apache/giraph/partition/SimplePartition.java   |   56 +------
 .../main/java/org/apache/giraph/vertex/Vertex.java |   10 ++
 .../src/test/java/org/apache/giraph/BspCase.java   |    3 +
 .../org/apache/giraph/TestPartitionContext.java    |   72 +++++++++
 17 files changed, 517 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 3524ae3..c060209 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-504: Create PartitionContext (majakabiljo)
+
   GIRAPH-499: Giraph should not reserve minimum reduce slot memory 1024 since we never use
it (ereisman)
 
   GIRAPH-508: Increase the limit on the number of partitions (majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
index d2641f1..5c2a01a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
@@ -21,6 +21,8 @@ import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.combiner.Combiner;
 import org.apache.giraph.master.DefaultMasterCompute;
 import org.apache.giraph.graph.DefaultVertexResolver;
+import org.apache.giraph.partition.DefaultPartitionContext;
+import org.apache.giraph.partition.PartitionContext;
 import org.apache.giraph.worker.DefaultWorkerContext;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.master.MasterCompute;
@@ -83,6 +85,8 @@ public class GiraphClasses<I extends WritableComparable,
 
   /** Vertex resolver class - cached for fast access */
   protected Class<? extends VertexResolver<I, V, E, M>> vertexResolverClass;
+  /** Partition context class - cached for fast access */
+  protected Class<? extends PartitionContext> partitionContextClass;
   /** Worker context class - cached for fast access */
   protected Class<? extends WorkerContext> workerContextClass;
   /** Master compute class - cached for fast access */
@@ -145,6 +149,8 @@ public class GiraphClasses<I extends WritableComparable,
     vertexResolverClass = (Class<? extends VertexResolver<I, V, E, M>>)
         conf.getClass(VERTEX_RESOLVER_CLASS,
         DefaultVertexResolver.class, VertexResolver.class);
+    partitionContextClass = conf.getClass(PARTITION_CONTEXT_CLASS,
+        DefaultPartitionContext.class, PartitionContext.class);
     workerContextClass = conf.getClass(WORKER_CONTEXT_CLASS,
         DefaultWorkerContext.class, WorkerContext.class);
     masterComputeClass =  conf.getClass(MASTER_COMPUTE_CLASS,
@@ -329,6 +335,24 @@ public class GiraphClasses<I extends WritableComparable,
   }
 
   /**
+   * Check if PartitionContext is set
+   *
+   * @return true if PartitionContext is set
+   */
+  public boolean hasPartitionContextClass() {
+    return partitionContextClass != null;
+  }
+
+  /**
+   * Get PartitionContext used
+   *
+   * @return PartitionContext
+   */
+  public Class<? extends PartitionContext> getPartitionContextClass() {
+    return partitionContextClass;
+  }
+
+  /**
    * Check if WorkerContext is set
    *
    * @return true if WorkerContext is set
@@ -523,6 +547,18 @@ public class GiraphClasses<I extends WritableComparable,
   }
 
   /**
+   * Set PartitionContext used
+   *
+   * @param partitionContextClass PartitionContext class to set
+   * @return this
+   */
+  public GiraphClasses setPartitionContextClass(
+      Class<? extends PartitionContext> partitionContextClass) {
+    this.partitionContextClass = partitionContextClass;
+    return this;
+  }
+
+  /**
    * Set WorkerContext used
    *
    * @param workerContextClass WorkerContext class to set

http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 7e48103..dc5c84f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -30,6 +30,7 @@ import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.master.MasterObserver;
 import org.apache.giraph.partition.GraphPartitionerFactory;
 import org.apache.giraph.partition.Partition;
+import org.apache.giraph.partition.PartitionContext;
 import org.apache.giraph.vertex.Vertex;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.giraph.worker.WorkerObserver;
@@ -276,6 +277,18 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
+   * Set the partition context class (optional)
+   *
+   * @param partitionContextClass Determines what code is executed for each
+   *        partition before and after each superstep
+   */
+  public final void setPartitionContextClass(
+      Class<? extends PartitionContext> partitionContextClass) {
+    setClass(PARTITION_CONTEXT_CLASS, partitionContextClass,
+        PartitionContext.class);
+  }
+
+  /**
    * Set the worker context class (optional)
    *
    * @param workerContextClass Determines what code is executed on a each

http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index fb4e8a3..e3d8ff3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -65,6 +65,8 @@ public interface GiraphConstants {
   String EDGE_VALUE_CLASS = "giraph.edgeValueClass";
   /** Message value class */
   String MESSAGE_VALUE_CLASS = "giraph.messageValueClass";
+  /** Partition context class */
+  String PARTITION_CONTEXT_CLASS = "giraph.partitionContextClass";
   /** Worker context class */
   String WORKER_CONTEXT_CLASS = "giraph.workerContextClass";
   /** AggregatorWriter class - optional */

http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 30a7da7..3e158af 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -24,6 +24,7 @@ import org.apache.giraph.job.GiraphJobObserver;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.master.MasterCompute;
+import org.apache.giraph.partition.PartitionContext;
 import org.apache.giraph.vertex.Vertex;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
@@ -287,6 +288,24 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Get the user's subclassed PartitionContext.
+   *
+   * @return User's partition context class
+   */
+  public Class<? extends PartitionContext> getPartitionContextClass() {
+    return classes.getPartitionContextClass();
+  }
+
+  /**
+   * Create a user partition context
+   *
+   * @return Instantiated user partition context
+   */
+  public PartitionContext createPartitionContext() {
+    return ReflectionUtils.newInstance(getPartitionContextClass(), this);
+  }
+
+  /**
    * Get the user's subclassed WorkerContext.
    *
    * @return User's worker context class

http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/examples/PartitionContextTestVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/PartitionContextTestVertex.java
b/giraph-core/src/main/java/org/apache/giraph/examples/PartitionContextTestVertex.java
new file mode 100644
index 0000000..f86c323
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/examples/PartitionContextTestVertex.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.partition.DefaultPartitionContext;
+import org.apache.giraph.vertex.EdgeListVertex;
+import org.apache.giraph.worker.DefaultWorkerContext;
+import org.apache.giraph.worker.WorkerContext;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.io.IOException;
+
+/**
+ * Vertex to test the functionality of PartitionContext
+ */
+public class PartitionContextTestVertex extends
+    EdgeListVertex<LongWritable, DoubleWritable, FloatWritable,
+        DoubleWritable> {
+  /** How many compute threads to use in the test */
+  public static final int NUM_COMPUTE_THREADS = 10;
+  /** How many vertices to create for the test */
+  public static final int NUM_VERTICES = 100;
+  /** How many partitions to have */
+  public static final int NUM_PARTITIONS = 25;
+
+  @Override
+  public void compute(Iterable<DoubleWritable> messages) throws IOException {
+    TestPartitionContextPartitionContext partitionContext =
+        (TestPartitionContextPartitionContext) getPartitionContext();
+    partitionContext.counter++;
+    if (getSuperstep() > 5) {
+      voteToHalt();
+    }
+  }
+
+  /**
+   * PartitionContext for TestPartitionContext
+   */
+  public static class TestPartitionContextPartitionContext extends
+      DefaultPartitionContext {
+    /**
+     * The counter should hold the number of vertices in this partition,
+     * plus the current superstep
+     */
+    private long counter;
+
+    @Override
+    public void preSuperstep(WorkerContext workerContext) {
+      counter =
+          ((TestPartitionContextWorkerContext) workerContext).superstepCounter;
+    }
+
+    @Override
+    public void postSuperstep(WorkerContext workerContext) {
+      ((TestPartitionContextWorkerContext) workerContext).totalCounter +=
+          counter;
+    }
+  }
+
+  /**
+   * WorkerContext for TestPartitionContext
+   */
+  public static class TestPartitionContextWorkerContext extends
+      DefaultWorkerContext {
+    /** Current superstep */
+    private long superstepCounter;
+    /**
+     * This counter should hold the sum of PartitionContext's counters
+     */
+    private long totalCounter;
+
+    @Override
+    public void preSuperstep() {
+      superstepCounter = getSuperstep();
+      totalCounter = 0;
+    }
+
+    @Override
+    public void postSuperstep() {
+      assertEquals(totalCounter,
+          NUM_PARTITIONS * superstepCounter + getTotalNumVertices());
+    }
+  }
+
+  /**
+   * Throws exception if values are not equal.
+   *
+   * @param expected Expected value
+   * @param actual   Actual value
+   */
+  private static void assertEquals(long expected, long actual) {
+    if (expected != actual) {
+      throw new RuntimeException("expected: " + expected +
+          ", actual: " + actual);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index 94ed6d9..c7aff7c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -27,6 +27,7 @@ import org.apache.giraph.metrics.MetricNames;
 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
 import org.apache.giraph.metrics.TimerDesc;
 import org.apache.giraph.partition.Partition;
+import org.apache.giraph.partition.PartitionContext;
 import org.apache.giraph.partition.PartitionStats;
 import org.apache.giraph.time.SystemTime;
 import org.apache.giraph.time.Time;
@@ -34,6 +35,7 @@ import org.apache.giraph.time.Times;
 import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.utils.TimedLogger;
 import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.worker.WorkerContext;
 import org.apache.giraph.worker.WorkerThreadAggregatorUsage;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -197,6 +199,15 @@ public class ComputeCallable<I extends WritableComparable, V extends
Writable,
         new PartitionStats(partition.getId(), 0, 0, 0, 0);
     // Make sure this is thread-safe across runs
     synchronized (partition) {
+      // Prepare Partition context
+      WorkerContext workerContext =
+          graphState.getGraphTaskManager().getWorkerContext();
+      PartitionContext partitionContext = partition.getPartitionContext();
+      synchronized (workerContext) {
+        partitionContext.preSuperstep(workerContext);
+      }
+      graphState.setPartitionContext(partition.getPartitionContext());
+
       for (Vertex<I, V, E, M> vertex : partition) {
         // Make sure every vertex has this thread's
         // graphState before computing
@@ -229,6 +240,10 @@ public class ComputeCallable<I extends WritableComparable, V extends
Writable,
       }
 
       messageStore.clearPartition(partition.getId());
+
+      synchronized (workerContext) {
+        partitionContext.postSuperstep(workerContext);
+      }
     }
     return partitionStats;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/graph/GraphState.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphState.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphState.java
index 9cdec7c..93ad5df 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphState.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphState.java
@@ -18,6 +18,7 @@
 package org.apache.giraph.graph;
 
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
+import org.apache.giraph.partition.PartitionContext;
 import org.apache.giraph.worker.WorkerAggregatorUsage;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -49,6 +50,8 @@ E extends Writable, M extends Writable> {
   workerClientRequestProcessor;
   /** Worker aggregator usage */
   private final WorkerAggregatorUsage workerAggregatorUsage;
+  /** Partition context */
+  private PartitionContext partitionContext;
 
   /**
    * Constructor
@@ -106,6 +109,14 @@ E extends Writable, M extends Writable> {
     return workerAggregatorUsage;
   }
 
+  public void setPartitionContext(PartitionContext partitionContext) {
+    this.partitionContext = partitionContext;
+  }
+
+  public PartitionContext getPartitionContext() {
+    return partitionContext;
+  }
+
   @Override
   public String toString() {
     return "(superstep=" + superstep + ",numVertices=" + numVertices + "," +

http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java
new file mode 100644
index 0000000..dc9192e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Basic partition class for other partitions to extend. Holds partition id,
+ * configuration, progressable and partition context
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public abstract class BasicPartition<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements Partition<I, V, E, M> {
+  /** Configuration from the worker */
+  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+  /** Partition id */
+  private int id;
+  /** Context used to report progress */
+  private Progressable progressable;
+  /** Partition context */
+  private PartitionContext partitionContext;
+
+  @Override
+  public void initialize(int partitionId, Progressable progressable) {
+    setId(partitionId);
+    setProgressable(progressable);
+    partitionContext = conf.createPartitionContext();
+  }
+
+  @Override
+  public void setConf(
+      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration) {
+    conf = configuration;
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
+    return conf;
+  }
+
+  @Override
+  public int getId() {
+    return id;
+  }
+
+  @Override
+  public void setId(int id) {
+    this.id = id;
+  }
+
+  @Override
+  public PartitionContext getPartitionContext() {
+    return partitionContext;
+  }
+
+  @Override
+  public void progress() {
+    if (progressable != null) {
+      progressable.progress();
+    }
+  }
+
+  @Override
+  public void setProgressable(Progressable progressable) {
+    this.progressable = progressable;
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    output.writeInt(id);
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    id = input.readInt();
+    partitionContext = conf.createPartitionContext();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
index d34af11..1298918 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
@@ -27,7 +27,6 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.vertex.Vertex;
 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
 import org.apache.giraph.utils.WritableUtils;
@@ -48,21 +47,15 @@ import org.apache.log4j.Logger;
  */
 public class ByteArrayPartition<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
-    implements Partition<I, V, E, M> {
+    extends BasicPartition<I, V, E, M> {
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(ByteArrayPartition.class);
-  /** Configuration from the worker */
-  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
-  /** Partition id */
-  private int id;
   /**
    * Vertex map for this range (keyed by index).  Note that the byte[] is a
    * serialized vertex with the first four bytes as the length of the vertex
    * to read.
    */
   private ConcurrentMap<I, byte[]> vertexMap;
-  /** Context used to report progress */
-  private Progressable progressable;
   /** Representative vertex */
   private Vertex<I, V, E, M> representativeVertex;
   /** Use unsafe serialization */
@@ -75,12 +68,11 @@ public class ByteArrayPartition<I extends WritableComparable,
 
   @Override
   public void initialize(int partitionId, Progressable progressable) {
-    setId(partitionId);
-    setProgressable(progressable);
+    super.initialize(partitionId, progressable);
     vertexMap = new MapMaker().concurrencyLevel(
-        conf.getNettyServerExecutionConcurrency()).makeMap();
-    representativeVertex = conf.createVertex();
-    useUnsafeSerialization = conf.useUnsafeSerialization();
+        getConf().getNettyServerExecutionConcurrency()).makeMap();
+    representativeVertex = getConf().createVertex();
+    useUnsafeSerialization = getConf().useUnsafeSerialization();
   }
 
   @Override
@@ -152,21 +144,6 @@ public class ByteArrayPartition<I extends WritableComparable,
   }
 
   @Override
-  public int getId() {
-    return id;
-  }
-
-  @Override
-  public void setId(int id) {
-    this.id = id;
-  }
-
-  @Override
-  public void setProgressable(Progressable progressable) {
-    this.progressable = progressable;
-  }
-
-  @Override
   public void saveVertex(Vertex<I, V, E, M> vertex) {
     // Reuse the old buffer whenever possible
     byte[] oldVertexData = vertexMap.get(vertex.getId());
@@ -183,12 +160,10 @@ public class ByteArrayPartition<I extends WritableComparable,
 
   @Override
   public void write(DataOutput output) throws IOException {
-    output.writeInt(id);
+    super.write(output);
     output.writeInt(vertexMap.size());
     for (Map.Entry<I, byte[]> entry : vertexMap.entrySet()) {
-      if (progressable != null) {
-        progressable.progress();
-      }
+      progress();
       entry.getKey().write(output);
       // Note here that we are writing the size of the vertex data first
       // as it is encoded in the first four bytes of the byte[]
@@ -207,18 +182,16 @@ public class ByteArrayPartition<I extends WritableComparable,
 
   @Override
   public void readFields(DataInput input) throws IOException {
-    id = input.readInt();
+    super.readFields(input);
     int size = input.readInt();
     vertexMap = new MapMaker().concurrencyLevel(
-        conf.getNettyServerExecutionConcurrency()).initialCapacity(
+        getConf().getNettyServerExecutionConcurrency()).initialCapacity(
         size).makeMap();
-    representativeVertex = conf.createVertex();
-    useUnsafeSerialization = conf.useUnsafeSerialization();
+    representativeVertex = getConf().createVertex();
+    useUnsafeSerialization = getConf().useUnsafeSerialization();
     for (int i = 0; i < size; ++i) {
-      if (progressable != null) {
-        progressable.progress();
-      }
-      I vertexId = conf.createVertexId();
+      progress();
+      I vertexId = getConf().createVertexId();
       vertexId.readFields(input);
       int vertexDataSize = input.readInt();
       byte[] vertexData = new byte[vertexDataSize];
@@ -231,17 +204,6 @@ public class ByteArrayPartition<I extends WritableComparable,
   }
 
   @Override
-  public void setConf(
-      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration) {
-    conf = configuration;
-  }
-
-  @Override
-  public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
-    return conf;
-  }
-
-  @Override
   public Iterator<Vertex<I, V, E, M>> iterator() {
     return new RepresentativeVertexIterator();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/partition/DefaultPartitionContext.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DefaultPartitionContext.java
b/giraph-core/src/main/java/org/apache/giraph/partition/DefaultPartitionContext.java
new file mode 100644
index 0000000..c22c802
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/DefaultPartitionContext.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import org.apache.giraph.worker.WorkerContext;
+
+/**
+ * Empty implementation of {@link PartitionContext}
+ */
+public class DefaultPartitionContext implements PartitionContext {
+  @Override
+  public void preSuperstep(WorkerContext workerContext) {
+  }
+
+  @Override
+  public void postSuperstep(WorkerContext workerContext) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java b/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
index 55ce8c0..657c054 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
@@ -107,6 +107,11 @@ public interface Partition<I extends WritableComparable,
   void setId(int id);
 
   /**
+   * Report progress.
+   */
+  void progress();
+
+  /**
    * Set the context.
    *
    * @param progressable Progressable
@@ -119,4 +124,11 @@ public interface Partition<I extends WritableComparable,
    * @param vertex Vertex to save
    */
   void saveVertex(Vertex<I, V, E, M> vertex);
+
+  /**
+   * Get partition context
+   *
+   * @return Partition context
+   */
+  PartitionContext getPartitionContext();
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/partition/PartitionContext.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionContext.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionContext.java
new file mode 100644
index 0000000..412f6e3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionContext.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import org.apache.giraph.worker.WorkerContext;
+
+/**
+ * PartitionContext allows for the execution of user code
+ * on a per-partition basis. There's one PartitionContext per partition.
+ */
+public interface PartitionContext {
+  /**
+   * Execute user code.
+   * This method is executed once for each partition before computation for
+   * that partition starts.
+   *
+   * @param workerContext Worker context
+   */
+  void preSuperstep(WorkerContext workerContext);
+
+  /**
+   * Execute user code.
+   * This method is executed once on for each partition after computation in
+   * current superstep for that partition ends.
+   *
+   * @param workerContext Worker context
+   */
+  void postSuperstep(WorkerContext workerContext);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
index 479011f..cbf6bc3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
@@ -19,7 +19,6 @@
 package org.apache.giraph.partition;
 
 import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.vertex.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -46,15 +45,9 @@ import java.util.concurrent.ConcurrentSkipListMap;
 @SuppressWarnings("rawtypes")
 public class SimplePartition<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
-    implements Partition<I, V, E, M> {
-  /** Configuration from the worker */
-  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
-  /** Partition id */
-  private int id;
+    extends BasicPartition<I, V, E, M> {
   /** Vertex map for this range (keyed by index) */
   private ConcurrentMap<I, Vertex<I, V, E, M>> vertexMap;
-  /** Context used to report progress */
-  private Progressable progressable;
 
   /**
    * Constructor for reflection.
@@ -63,9 +56,8 @@ public class SimplePartition<I extends WritableComparable,
 
   @Override
   public void initialize(int partitionId, Progressable progressable) {
-    setId(partitionId);
-    setProgressable(progressable);
-    if (conf.getBoolean(GiraphConstants.USE_OUT_OF_CORE_MESSAGES,
+    super.initialize(partitionId, progressable);
+    if (getConf().getBoolean(GiraphConstants.USE_OUT_OF_CORE_MESSAGES,
         GiraphConstants.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
       vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
     } else {
@@ -110,21 +102,6 @@ public class SimplePartition<I extends WritableComparable,
   }
 
   @Override
-  public int getId() {
-    return id;
-  }
-
-  @Override
-  public void setId(int id) {
-    this.id = id;
-  }
-
-  @Override
-  public void setProgressable(Progressable progressable) {
-    this.progressable = progressable;
-  }
-
-  @Override
   public void saveVertex(Vertex<I, V, E, M> vertex) {
     // No-op, vertices are stored as Java objects in this partition
   }
@@ -136,19 +113,17 @@ public class SimplePartition<I extends WritableComparable,
 
   @Override
   public void readFields(DataInput input) throws IOException {
-    if (conf.getBoolean(GiraphConstants.USE_OUT_OF_CORE_MESSAGES,
+    super.readFields(input);
+    if (getConf().getBoolean(GiraphConstants.USE_OUT_OF_CORE_MESSAGES,
         GiraphConstants.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
       vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
     } else {
       vertexMap = Maps.newConcurrentMap();
     }
-    id = input.readInt();
     int vertices = input.readInt();
     for (int i = 0; i < vertices; ++i) {
-      Vertex<I, V, E, M> vertex = conf.createVertex();
-      if (progressable != null) {
-        progressable.progress();
-      }
+      Vertex<I, V, E, M> vertex = getConf().createVertex();
+      progress();
       vertex.readFields(input);
       if (vertexMap.put(vertex.getId(), vertex) != null) {
         throw new IllegalStateException(
@@ -160,28 +135,15 @@ public class SimplePartition<I extends WritableComparable,
 
   @Override
   public void write(DataOutput output) throws IOException {
-    output.writeInt(id);
+    super.write(output);
     output.writeInt(vertexMap.size());
     for (Vertex vertex : vertexMap.values()) {
-      if (progressable != null) {
-        progressable.progress();
-      }
+      progress();
       vertex.write(output);
     }
   }
 
   @Override
-  public void setConf(
-      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration) {
-    this.conf = configuration;
-  }
-
-  @Override
-  public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
-    return conf;
-  }
-
-  @Override
   public Iterator<Vertex<I, V, E, M>> iterator() {
     return vertexMap.values().iterator();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/vertex/Vertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/Vertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/Vertex.java
index 974232e..db6dca3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/vertex/Vertex.java
+++ b/giraph-core/src/main/java/org/apache/giraph/vertex/Vertex.java
@@ -23,6 +23,7 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.DefaultEdge;
 import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.partition.PartitionContext;
 import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.worker.WorkerAggregatorUsage;
 import org.apache.giraph.worker.WorkerContext;
@@ -328,6 +329,15 @@ public abstract class Vertex<I extends WritableComparable,
   }
 
   /**
+   * Get the partition context
+   *
+   * @return Partition context
+   */
+  public PartitionContext getPartitionContext() {
+    return getGraphState().getPartitionContext();
+  }
+
+  /**
    * Get the worker context
    *
    * @return WorkerContext context

http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/test/java/org/apache/giraph/BspCase.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/BspCase.java b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
index 6aab533..0fe9fda 100644
--- a/giraph-core/src/test/java/org/apache/giraph/BspCase.java
+++ b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
@@ -181,6 +181,9 @@ public class BspCase implements Watcher {
     if (classes.hasVertexOutputFormat()) {
       conf.setVertexOutputFormatClass(classes.getVertexOutputFormatClass());
     }
+    if (classes.hasPartitionContextClass()) {
+      conf.setPartitionContextClass(classes.getPartitionContextClass());
+    }
     if (classes.hasWorkerContextClass()) {
       conf.setWorkerContextClass(classes.getWorkerContextClass());
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/test/java/org/apache/giraph/TestPartitionContext.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestPartitionContext.java b/giraph-core/src/test/java/org/apache/giraph/TestPartitionContext.java
new file mode 100644
index 0000000..cdf1f65
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/TestPartitionContext.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph;
+
+import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.examples.PartitionContextTestVertex;
+import org.apache.giraph.examples.GeneratedVertexReader;
+import org.apache.giraph.examples.SimplePageRankVertex;
+import org.apache.giraph.job.GiraphJob;
+import org.apache.giraph.partition.HashMasterPartitioner;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+public class TestPartitionContext extends BspCase {
+  public TestPartitionContext() {
+    super(TestPartitionContext.class.getName());
+  }
+
+  @Test
+  public void testPartitionContext() throws IOException,
+      ClassNotFoundException, InterruptedException {
+    if (runningInDistributedMode()) {
+      System.out.println(
+          "testComputeContext: Ignore this test in distributed mode.");
+      return;
+    }
+    GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+        classes = new GiraphClasses();
+    classes.setVertexClass(PartitionContextTestVertex.class);
+    classes.setVertexInputFormatClass(
+        SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
+    classes.setWorkerContextClass(
+        PartitionContextTestVertex.TestPartitionContextWorkerContext.class);
+    classes.setPartitionContextClass(
+        PartitionContextTestVertex.TestPartitionContextPartitionContext.class);
+    GiraphJob job = prepareJob(getCallingMethodName(), classes);
+    // Use multithreading
+    job.getConfiguration().setNumComputeThreads(
+        PartitionContextTestVertex.NUM_COMPUTE_THREADS);
+    // Increase the number of vertices
+    job.getConfiguration().setInt(
+        GeneratedVertexReader.READER_VERTICES,
+        PartitionContextTestVertex.NUM_VERTICES);
+    // Increase the number of partitions
+    job.getConfiguration().setInt(
+        HashMasterPartitioner.USER_PARTITION_COUNT,
+        PartitionContextTestVertex.NUM_PARTITIONS);
+    assertTrue(job.run(true));
+  }
+}


Mime
View raw message