giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject [05/12] GIRAPH-667: Decouple Vertex data and Computation, make Computation and Combiner classes switchable (majakabiljo)
Date Mon, 20 May 2013 17:27:01 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
index 0039ad6..d210928 100644
--- a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
+++ b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
@@ -22,12 +22,13 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
 import org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore;
+import org.apache.giraph.graph.Computation;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.ArrayListEdges;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.partition.BasicPartitionOwner;
 import org.apache.giraph.partition.PartitionOwner;
-import org.apache.giraph.edge.ArrayListEdges;
 import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
@@ -55,7 +56,7 @@ public class MockUtils {
     public static class MockedEnvironment<I extends WritableComparable,
             V extends Writable, E extends Writable, M extends Writable> {
 
-        private final GraphState<I, V, E, M> graphState;
+        private final GraphState graphState;
         private final Mapper.Context context;
         private final Configuration conf;
         private final WorkerClientRequestProcessor workerClientRequestProcessor;
@@ -101,14 +102,15 @@ public class MockUtils {
     }
 
     /**
-     * prepare a vertex for use in a unit test by setting its internal state and injecting mocked
-     * dependencies,
+     * prepare a vertex and computation for use in a unit test by setting its
+     * internal state and injecting mocked dependencies,
      *
-     * @param vertex
-     * @param superstep the superstep to emulate
+     * @param vertex Vertex
      * @param vertexId initial vertex id
      * @param vertexValue initial vertex value
      * @param isHalted initial halted state of the vertex
+     * @param computation Computation
+     * @param superstep Superstep
      * @param <I> vertex id
      * @param <V> vertex data
      * @param <E> edge data
@@ -116,47 +118,42 @@ public class MockUtils {
      * @return
      * @throws Exception
      */
-    public static <I extends WritableComparable, V extends Writable,
-            E extends Writable, M extends Writable>
-            MockedEnvironment<I, V, E, M> prepareVertex(
-            Vertex<I, V, E, M> vertex, long superstep, I vertexId,
-            V vertexValue, boolean isHalted) throws Exception {
-
-        MockedEnvironment<I, V, E, M>  env =
-                new MockedEnvironment<I, V, E, M>();
-
-        Mockito.when(env.getGraphState().getSuperstep()).thenReturn(superstep);
-        Mockito.when(env.getGraphState().getContext())
-                .thenReturn(env.getContext());
-        Mockito.when(env.getContext().getConfiguration())
-                .thenReturn(env.getConfiguration());
-        Mockito.when(env.getGraphState().getWorkerClientRequestProcessor())
-                .thenReturn(env.getWorkerClientRequestProcessor());
-
-        GiraphConfiguration giraphConf = new GiraphConfiguration();
-        giraphConf.setVertexClass(vertex.getClass());
-        ImmutableClassesGiraphConfiguration<I, V, E, M> conf =
-            new ImmutableClassesGiraphConfiguration<I, V, E, M>(giraphConf);
-        vertex.setConf(conf);
-        ArrayListEdges<I, E> edges = new ArrayListEdges<I, E>();
-        edges.setConf((ImmutableClassesGiraphConfiguration<I, Writable, E,
-            Writable>) conf);
-        edges.initialize();
-
-        ReflectionUtils.setField(vertex, "id", vertexId);
-        ReflectionUtils.setField(vertex, "value", vertexValue);
-        ReflectionUtils.setField(vertex, "edges", edges);
-        ReflectionUtils.setField(vertex, "graphState", env.getGraphState());
-        ReflectionUtils.setField(vertex, "halt", isHalted);
-
-        return env;
+  public static <I extends WritableComparable, V extends Writable,
+      E extends Writable, M extends Writable>
+  MockedEnvironment<I, V, E, M> prepareVertexAndComputation(
+      Vertex<I, V, E> vertex, I vertexId, V vertexValue, boolean isHalted,
+      Computation<I, V, E, M, M> computation, long superstep) throws
+      Exception {
+    MockedEnvironment<I, V, E, M> env = new MockedEnvironment<I, V, E, M>();
+    Mockito.when(env.getGraphState().getSuperstep()).thenReturn(superstep);
+    Mockito.when(env.getGraphState().getContext())
+        .thenReturn(env.getContext());
+    Mockito.when(env.getContext().getConfiguration())
+        .thenReturn(env.getConfiguration());
+    computation.initialize(env.getGraphState(),
+        env.getWorkerClientRequestProcessor(), null, null, null);
+
+    GiraphConfiguration giraphConf = new GiraphConfiguration();
+    giraphConf.setComputationClass(computation.getClass());
+    giraphConf.setOutEdgesClass(ArrayListEdges.class);
+    ImmutableClassesGiraphConfiguration<I, V, E> conf =
+        new ImmutableClassesGiraphConfiguration<I, V, E>(giraphConf);
+    computation.setConf(conf);
+
+    vertex.setConf(conf);
+    vertex.initialize(vertexId, vertexValue);
+    if (isHalted) {
+      vertex.voteToHalt();
     }
 
+    return env;
+  }
+
   public static CentralizedServiceWorker<IntWritable, IntWritable,
-      IntWritable, IntWritable> mockServiceGetVertexPartitionOwner(final int
+      IntWritable> mockServiceGetVertexPartitionOwner(final int
       numOfPartitions) {
-    CentralizedServiceWorker<IntWritable, IntWritable, IntWritable,
-        IntWritable> service = Mockito.mock(CentralizedServiceWorker.class);
+    CentralizedServiceWorker<IntWritable, IntWritable, IntWritable> service =
+        Mockito.mock(CentralizedServiceWorker.class);
     Answer<PartitionOwner> answer = new Answer<PartitionOwner>() {
       @Override
       public PartitionOwner answer(InvocationOnMock invocation) throws
@@ -170,10 +167,10 @@ public class MockUtils {
     return service;
   }
 
-  public static ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
+  public static ServerData<IntWritable, IntWritable, IntWritable>
   createNewServerData(ImmutableClassesGiraphConfiguration conf,
       Mapper.Context context) {
-    return new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>(
+    return new ServerData<IntWritable, IntWritable, IntWritable>(
         Mockito.mock(CentralizedServiceWorker.class),
         conf,
         ByteArrayMessagesPerVertexStore.newFactory(

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/utils/NoOpComputation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/NoOpComputation.java b/giraph-core/src/test/java/org/apache/giraph/utils/NoOpComputation.java
new file mode 100644
index 0000000..630888f
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/utils/NoOpComputation.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+/**
+ * Computation which does nothing, just halts, used for testing
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public class NoOpComputation<I extends WritableComparable, V extends Writable,
+    E extends Writable, M extends Writable>
+    extends BasicComputation<I, V, E, M> {
+  @Override
+  public void compute(Vertex<I, V, E> vertex,
+      Iterable<M> messages) throws IOException {
+    vertex.voteToHalt();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/vertices/IntIntNullVertexDoNothing.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/vertices/IntIntNullVertexDoNothing.java b/giraph-core/src/test/java/org/apache/giraph/vertices/IntIntNullVertexDoNothing.java
deleted file mode 100644
index c98d580..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/vertices/IntIntNullVertexDoNothing.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.vertices;
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-
-public class IntIntNullVertexDoNothing extends VertexDoNothing<IntWritable,
-    IntWritable, NullWritable, NullWritable> {
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/vertices/VertexCountEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/vertices/VertexCountEdges.java b/giraph-core/src/test/java/org/apache/giraph/vertices/VertexCountEdges.java
deleted file mode 100644
index 9060bc7..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/vertices/VertexCountEdges.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.vertices;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-
-import java.io.IOException;
-
-public class VertexCountEdges extends Vertex<IntWritable, IntWritable,
-    NullWritable, NullWritable> {
-  @Override
-  public void compute(Iterable<NullWritable> messages) throws IOException {
-    setValue(new IntWritable(getNumEdges()));
-    voteToHalt();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/vertices/VertexDoNothing.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/vertices/VertexDoNothing.java b/giraph-core/src/test/java/org/apache/giraph/vertices/VertexDoNothing.java
deleted file mode 100644
index fac3fce..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/vertices/VertexDoNothing.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.vertices;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.IOException;
-
-public class VertexDoNothing<I extends WritableComparable, V extends Writable,
-    E extends Writable, M extends Writable> extends Vertex<I, V, E, M> {
-  @Override
-  public void compute(Iterable<M> messages) throws IOException {
-    voteToHalt();
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/yarn/TestYarnJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/yarn/TestYarnJob.java b/giraph-core/src/test/java/org/apache/giraph/yarn/TestYarnJob.java
index f9d5544..ed365b4 100644
--- a/giraph-core/src/test/java/org/apache/giraph/yarn/TestYarnJob.java
+++ b/giraph-core/src/test/java/org/apache/giraph/yarn/TestYarnJob.java
@@ -30,6 +30,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.BasicComputation;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.io.formats.GiraphFileInputFormat;
 import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
@@ -61,11 +62,12 @@ public class TestYarnJob implements Watcher {
   /**
    * Simple No-Op vertex to test if we can run a quick Giraph job on YARN.
    */
-  private static class DummyYarnVertex extends Vertex<IntWritable, IntWritable,
-      NullWritable, IntWritable> {
+  private static class DummyYarnComputation extends BasicComputation<
+      IntWritable, IntWritable, NullWritable, IntWritable> {
     @Override
-    public void compute(Iterable<IntWritable> messages) throws IOException {
-      voteToHalt();
+    public void compute(Vertex<IntWritable, IntWritable, NullWritable> vertex,
+        Iterable<IntWritable> messages) throws IOException {
+      vertex.voteToHalt();
     }
   }
 
@@ -203,7 +205,7 @@ public class TestYarnJob implements Watcher {
     conf.setEventWaitMsecs(3 * 1000);
     conf.setYarnLibJars(""); // no need
     conf.setYarnTaskHeapMb(256); // small since no work to be done
-    conf.setVertexClass(DummyYarnVertex.class);
+    conf.setComputationClass(DummyYarnComputation.class);
     conf.setVertexInputFormatClass(IntIntNullTextInputFormat.class);
     conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
     conf.setNumComputeThreads(1);

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java
new file mode 100644
index 0000000..db527f2
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.master.DefaultMasterCompute;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.io.IOException;
+
+/** Computation which uses aggrergators. To be used for testing. */
+public class AggregatorsTestComputation extends
+    BasicComputation<LongWritable, DoubleWritable, FloatWritable,
+        DoubleWritable> {
+
+  /** Name of regular aggregator */
+  private static final String REGULAR_AGG = "regular";
+  /** Name of persistent aggregator */
+  private static final String PERSISTENT_AGG = "persistent";
+  /** Name of master overwriting aggregator */
+  private static final String MASTER_WRITE_AGG = "master";
+  /** Value which master compute will use */
+  private static final long MASTER_VALUE = 12345;
+  /** Prefix for name of aggregators in array */
+  private static final String ARRAY_PREFIX_AGG = "array";
+  /** Number of aggregators to use in array */
+  private static final int NUM_OF_AGGREGATORS_IN_ARRAY = 100;
+
+  @Override
+  public void compute(
+      Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
+      Iterable<DoubleWritable> messages) throws IOException {
+    long superstep = getSuperstep();
+
+    LongWritable myValue = new LongWritable(1L << superstep);
+    aggregate(REGULAR_AGG, myValue);
+    aggregate(PERSISTENT_AGG, myValue);
+
+    long nv = getTotalNumVertices();
+    if (superstep > 0) {
+      assertEquals(nv * (1L << (superstep - 1)),
+          ((LongWritable) getAggregatedValue(REGULAR_AGG)).get());
+    } else {
+      assertEquals(0,
+          ((LongWritable) getAggregatedValue(REGULAR_AGG)).get());
+    }
+    assertEquals(nv * ((1L << superstep) - 1),
+        ((LongWritable) getAggregatedValue(PERSISTENT_AGG)).get());
+    assertEquals(MASTER_VALUE * (1L << superstep),
+        ((LongWritable) getAggregatedValue(MASTER_WRITE_AGG)).get());
+
+    for (int i = 0; i < NUM_OF_AGGREGATORS_IN_ARRAY; i++) {
+      aggregate(ARRAY_PREFIX_AGG + i, new LongWritable((superstep + 1) * i));
+      assertEquals(superstep * getTotalNumVertices() * i,
+          ((LongWritable) getAggregatedValue(ARRAY_PREFIX_AGG + i)).get());
+    }
+
+    if (getSuperstep() == 10) {
+      vertex.voteToHalt();
+    }
+  }
+
+  /** Master compute which uses aggregators. To be used for testing. */
+  public static class AggregatorsTestMasterCompute extends
+      DefaultMasterCompute {
+    @Override
+    public void compute() {
+      long superstep = getSuperstep();
+
+      LongWritable myValue =
+          new LongWritable(MASTER_VALUE * (1L << superstep));
+      setAggregatedValue(MASTER_WRITE_AGG, myValue);
+
+      long nv = getTotalNumVertices();
+      if (superstep > 0) {
+        assertEquals(nv * (1L << (superstep - 1)),
+            ((LongWritable) getAggregatedValue(REGULAR_AGG)).get());
+      } else {
+        assertEquals(0,
+            ((LongWritable) getAggregatedValue(REGULAR_AGG)).get());
+      }
+      assertEquals(nv * ((1L << superstep) - 1),
+          ((LongWritable) getAggregatedValue(PERSISTENT_AGG)).get());
+
+      for (int i = 0; i < NUM_OF_AGGREGATORS_IN_ARRAY; i++) {
+        assertEquals(superstep * getTotalNumVertices() * i,
+            ((LongWritable) getAggregatedValue(ARRAY_PREFIX_AGG + i)).get());
+      }
+    }
+
+    @Override
+    public void initialize() throws InstantiationException,
+        IllegalAccessException {
+      registerAggregator(REGULAR_AGG, LongSumAggregator.class);
+      registerPersistentAggregator(PERSISTENT_AGG,
+          LongSumAggregator.class);
+      registerAggregator(MASTER_WRITE_AGG, LongSumAggregator.class);
+
+      for (int i = 0; i < NUM_OF_AGGREGATORS_IN_ARRAY; i++) {
+        registerAggregator(ARRAY_PREFIX_AGG + i, LongSumAggregator.class);
+      }
+    }
+  }
+
+  /**
+   * Throws exception if values are not equal.
+   *
+   * @param expected Expected value
+   * @param actual   Actual value
+   */
+  private static void assertEquals(long expected, long actual) {
+    if (expected != actual) {
+      throw new RuntimeException("expected: " + expected +
+          ", actual: " + actual);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java
deleted file mode 100644
index d08519b..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.aggregators.LongSumAggregator;
-import org.apache.giraph.master.DefaultMasterCompute;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
-
-import java.io.IOException;
-
-/** Vertex which uses aggrergators. To be used for testing. */
-public class AggregatorsTestVertex extends
-    Vertex<LongWritable, DoubleWritable, FloatWritable,
-        DoubleWritable> {
-
-  /** Name of regular aggregator */
-  private static final String REGULAR_AGG = "regular";
-  /** Name of persistent aggregator */
-  private static final String PERSISTENT_AGG = "persistent";
-  /** Name of master overwriting aggregator */
-  private static final String MASTER_WRITE_AGG = "master";
-  /** Value which master compute will use */
-  private static final long MASTER_VALUE = 12345;
-  /** Prefix for name of aggregators in array */
-  private static final String ARRAY_PREFIX_AGG = "array";
-  /** Number of aggregators to use in array */
-  private static final int NUM_OF_AGGREGATORS_IN_ARRAY = 100;
-
-  @Override
-  public void compute(Iterable<DoubleWritable> messages) throws IOException {
-    long superstep = getSuperstep();
-
-    LongWritable myValue = new LongWritable(1L << superstep);
-    aggregate(REGULAR_AGG, myValue);
-    aggregate(PERSISTENT_AGG, myValue);
-
-    long nv = getTotalNumVertices();
-    if (superstep > 0) {
-      assertEquals(nv * (1L << (superstep - 1)),
-          ((LongWritable) getAggregatedValue(REGULAR_AGG)).get());
-    } else {
-      assertEquals(0,
-          ((LongWritable) getAggregatedValue(REGULAR_AGG)).get());
-    }
-    assertEquals(nv * ((1L << superstep) - 1),
-        ((LongWritable) getAggregatedValue(PERSISTENT_AGG)).get());
-    assertEquals(MASTER_VALUE * (1L << superstep),
-        ((LongWritable) getAggregatedValue(MASTER_WRITE_AGG)).get());
-
-    for (int i = 0; i < NUM_OF_AGGREGATORS_IN_ARRAY; i++) {
-      aggregate(ARRAY_PREFIX_AGG + i, new LongWritable((superstep + 1) * i));
-      assertEquals(superstep * getTotalNumVertices() * i,
-          ((LongWritable) getAggregatedValue(ARRAY_PREFIX_AGG + i)).get());
-    }
-
-    if (getSuperstep() == 10) {
-      voteToHalt();
-    }
-  }
-
-  /** Master compute which uses aggregators. To be used for testing. */
-  public static class AggregatorsTestMasterCompute extends
-      DefaultMasterCompute {
-    @Override
-    public void compute() {
-      long superstep = getSuperstep();
-
-      LongWritable myValue =
-          new LongWritable(MASTER_VALUE * (1L << superstep));
-      setAggregatedValue(MASTER_WRITE_AGG, myValue);
-
-      long nv = getTotalNumVertices();
-      if (superstep > 0) {
-        assertEquals(nv * (1L << (superstep - 1)),
-            ((LongWritable) getAggregatedValue(REGULAR_AGG)).get());
-      } else {
-        assertEquals(0,
-            ((LongWritable) getAggregatedValue(REGULAR_AGG)).get());
-      }
-      assertEquals(nv * ((1L << superstep) - 1),
-          ((LongWritable) getAggregatedValue(PERSISTENT_AGG)).get());
-
-      for (int i = 0; i < NUM_OF_AGGREGATORS_IN_ARRAY; i++) {
-        assertEquals(superstep * getTotalNumVertices() * i,
-            ((LongWritable) getAggregatedValue(ARRAY_PREFIX_AGG + i)).get());
-      }
-    }
-
-    @Override
-    public void initialize() throws InstantiationException,
-        IllegalAccessException {
-      registerAggregator(REGULAR_AGG, LongSumAggregator.class);
-      registerPersistentAggregator(PERSISTENT_AGG,
-          LongSumAggregator.class);
-      registerAggregator(MASTER_WRITE_AGG, LongSumAggregator.class);
-
-      for (int i = 0; i < NUM_OF_AGGREGATORS_IN_ARRAY; i++) {
-        registerAggregator(ARRAY_PREFIX_AGG + i, LongSumAggregator.class);
-      }
-    }
-  }
-
-  /**
-   * Throws exception if values are not equal.
-   *
-   * @param expected Expected value
-   * @param actual   Actual value
-   */
-  private static void assertEquals(long expected, long actual) {
-    if (expected != actual) {
-      throw new RuntimeException("expected: " + expected +
-          ", actual: " + actual);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/ConnectedComponentsComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/ConnectedComponentsComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/ConnectedComponentsComputation.java
new file mode 100644
index 0000000..9b0cfe1
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/ConnectedComponentsComputation.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import java.io.IOException;
+
+/**
+ * Implementation of the HCC algorithm that identifies connected components and
+ * assigns each vertex its "component identifier" (the smallest vertex id
+ * in the component)
+ *
+ * The idea behind the algorithm is very simple: propagate the smallest
+ * vertex id along the edges to all vertices of a connected component. The
+ * number of supersteps necessary is equal to the length of the maximum
+ * diameter of all components + 1
+ *
+ * The original Hadoop-based variant of this algorithm was proposed by Kang,
+ * Charalampos, Tsourakakis and Faloutsos in
+ * "PEGASUS: Mining Peta-Scale Graphs", 2010
+ *
+ * http://www.cs.cmu.edu/~ukang/papers/PegasusKAIS.pdf
+ */
+@Algorithm(
+    name = "Connected components",
+    description = "Finds connected components of the graph"
+)
+public class ConnectedComponentsComputation extends
+    BasicComputation<IntWritable, IntWritable, NullWritable, IntWritable> {
+  /**
+   * Propagates the smallest vertex id to all neighbors. Will always choose to
+   * halt and only reactivate if a smaller id has been sent to it.
+   *
+   * @param vertex Vertex
+   * @param messages Iterator of messages from the previous superstep.
+   * @throws IOException
+   */
+  @Override
+  public void compute(
+      Vertex<IntWritable, IntWritable, NullWritable> vertex,
+      Iterable<IntWritable> messages) throws IOException {
+    int currentComponent = vertex.getValue().get();
+
+    // First superstep is special, because we can simply look at the neighbors
+    if (getSuperstep() == 0) {
+      for (Edge<IntWritable, NullWritable> edge : vertex.getEdges()) {
+        int neighbor = edge.getTargetVertexId().get();
+        if (neighbor < currentComponent) {
+          currentComponent = neighbor;
+        }
+      }
+      // Only need to send value if it is not the own id
+      if (currentComponent != vertex.getValue().get()) {
+        vertex.setValue(new IntWritable(currentComponent));
+        for (Edge<IntWritable, NullWritable> edge : vertex.getEdges()) {
+          IntWritable neighbor = edge.getTargetVertexId();
+          if (neighbor.get() > currentComponent) {
+            sendMessage(neighbor, vertex.getValue());
+          }
+        }
+      }
+
+      vertex.voteToHalt();
+      return;
+    }
+
+    boolean changed = false;
+    // did we get a smaller id ?
+    for (IntWritable message : messages) {
+      int candidateComponent = message.get();
+      if (candidateComponent < currentComponent) {
+        currentComponent = candidateComponent;
+        changed = true;
+      }
+    }
+
+    // propagate new component id to the neighbors
+    if (changed) {
+      vertex.setValue(new IntWritable(currentComponent));
+      sendMessageToAllEdges(vertex, vertex.getValue());
+    }
+    vertex.voteToHalt();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/ConnectedComponentsVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/ConnectedComponentsVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/ConnectedComponentsVertex.java
deleted file mode 100644
index dbeb6bf..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/ConnectedComponentsVertex.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-
-import java.io.IOException;
-
-/**
- * Implementation of the HCC algorithm that identifies connected components and
- * assigns each vertex its "component identifier" (the smallest vertex id
- * in the component)
- *
- * The idea behind the algorithm is very simple: propagate the smallest
- * vertex id along the edges to all vertices of a connected component. The
- * number of supersteps necessary is equal to the length of the maximum
- * diameter of all components + 1
- *
- * The original Hadoop-based variant of this algorithm was proposed by Kang,
- * Charalampos, Tsourakakis and Faloutsos in
- * "PEGASUS: Mining Peta-Scale Graphs", 2010
- *
- * http://www.cs.cmu.edu/~ukang/papers/PegasusKAIS.pdf
- */
-@Algorithm(
-    name = "Connected components",
-    description = "Finds connected components of the graph"
-)
-public class ConnectedComponentsVertex extends Vertex<IntWritable,
-    IntWritable, NullWritable, IntWritable> {
-  /**
-   * Propagates the smallest vertex id to all neighbors. Will always choose to
-   * halt and only reactivate if a smaller id has been sent to it.
-   *
-   * @param messages Iterator of messages from the previous superstep.
-   * @throws IOException
-   */
-  @Override
-  public void compute(Iterable<IntWritable> messages) throws IOException {
-    int currentComponent = getValue().get();
-
-    // First superstep is special, because we can simply look at the neighbors
-    if (getSuperstep() == 0) {
-      for (Edge<IntWritable, NullWritable> edge : getEdges()) {
-        int neighbor = edge.getTargetVertexId().get();
-        if (neighbor < currentComponent) {
-          currentComponent = neighbor;
-        }
-      }
-      // Only need to send value if it is not the own id
-      if (currentComponent != getValue().get()) {
-        setValue(new IntWritable(currentComponent));
-        for (Edge<IntWritable, NullWritable> edge : getEdges()) {
-          IntWritable neighbor = edge.getTargetVertexId();
-          if (neighbor.get() > currentComponent) {
-            sendMessage(neighbor, getValue());
-          }
-        }
-      }
-
-      voteToHalt();
-      return;
-    }
-
-    boolean changed = false;
-    // did we get a smaller id ?
-    for (IntWritable message : messages) {
-      int candidateComponent = message.get();
-      if (candidateComponent < currentComponent) {
-        currentComponent = candidateComponent;
-        changed = true;
-      }
-    }
-
-    // propagate new component id to the neighbors
-    if (changed) {
-      setValue(new IntWritable(currentComponent));
-      sendMessageToAllEdges(getValue());
-    }
-    voteToHalt();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/IdentityComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/IdentityComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/IdentityComputation.java
new file mode 100644
index 0000000..12a58d5
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/IdentityComputation.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+/**
+ * User applications can subclass IdentityComputation, which
+ * simply prints the results that have been read for testing IO related
+ * jobs under any inputformat
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public class IdentityComputation<I extends WritableComparable,
+  V extends Writable, E extends Writable, M extends Writable>
+  extends BasicComputation<I, V, E, M> {
+  @Override
+  public void compute(Vertex<I, V, E> vertex,
+      Iterable<M> messages) throws IOException {
+    vertex.voteToHalt();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/IdentityVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/IdentityVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/IdentityVertex.java
deleted file mode 100644
index 30cca86..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/IdentityVertex.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * User applications can subclass IdentityVertex, which
- * simply prints the results that have been read for testing IO related
- * jobs under any inputformat
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message value
- */
-@SuppressWarnings("rawtypes")
-
-public abstract class IdentityVertex<I extends WritableComparable,
-  V extends Writable, E extends Writable, M extends Writable>
-  extends Vertex<I, V, E, M> {
-
-  @Override
-  public void compute(Iterable<M> messages) {
-    voteToHalt();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleDoubleTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleDoubleTextInputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleDoubleTextInputFormat.java
index 62bea5a..f56b4f6 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleDoubleTextInputFormat.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleDoubleTextInputFormat.java
@@ -27,7 +27,6 @@ import org.apache.giraph.edge.EdgeFactory;
 import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
@@ -44,10 +43,10 @@ public class LongDoubleDoubleTextInputFormat
     extends TextVertexInputFormat<LongWritable, DoubleWritable,
     DoubleWritable>
     implements ImmutableClassesGiraphConfigurable<LongWritable, DoubleWritable,
-    DoubleWritable, Writable> {
+    DoubleWritable> {
   /** Configuration. */
   private ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
-      DoubleWritable, Writable> conf;
+      DoubleWritable> conf;
 
   @Override
   public TextVertexReader createVertexReader(InputSplit split,
@@ -58,13 +57,13 @@ public class LongDoubleDoubleTextInputFormat
 
   @Override
   public void setConf(ImmutableClassesGiraphConfiguration<LongWritable,
-      DoubleWritable, DoubleWritable, Writable> configuration) {
+      DoubleWritable, DoubleWritable> configuration) {
     this.conf = configuration;
   }
 
   @Override
   public ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
-      DoubleWritable, Writable> getConf() {
+      DoubleWritable> getConf() {
     return conf;
   }
 
@@ -79,9 +78,9 @@ public class LongDoubleDoubleTextInputFormat
     private final Pattern separator = Pattern.compile("[\t ]");
 
     @Override
-    public Vertex<LongWritable, DoubleWritable, DoubleWritable, ?>
+    public Vertex<LongWritable, DoubleWritable, DoubleWritable>
     getCurrentVertex() throws IOException, InterruptedException {
-      Vertex<LongWritable, DoubleWritable, DoubleWritable, ?>
+      Vertex<LongWritable, DoubleWritable, DoubleWritable>
         vertex = conf.createVertex();
 
       String[] tokens =

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleNullTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleNullTextInputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleNullTextInputFormat.java
index fdc9050..bfb5f40 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleNullTextInputFormat.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleNullTextInputFormat.java
@@ -28,7 +28,6 @@ import org.apache.giraph.io.formats.TextVertexInputFormat;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
@@ -42,10 +41,10 @@ import java.util.regex.Pattern;
 public class LongDoubleNullTextInputFormat
     extends TextVertexInputFormat<LongWritable, DoubleWritable, NullWritable>
     implements ImmutableClassesGiraphConfigurable<LongWritable, DoubleWritable,
-    NullWritable, Writable> {
+    NullWritable> {
   /** Configuration. */
   private ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
-      NullWritable, Writable> conf;
+      NullWritable> conf;
 
   @Override
   public TextVertexReader createVertexReader(InputSplit split,
@@ -56,13 +55,13 @@ public class LongDoubleNullTextInputFormat
 
   @Override
   public void setConf(ImmutableClassesGiraphConfiguration<LongWritable,
-      DoubleWritable, NullWritable, Writable> configuration) {
+      DoubleWritable, NullWritable> configuration) {
     this.conf = configuration;
   }
 
   @Override
   public ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
-      NullWritable, Writable> getConf() {
+      NullWritable> getConf() {
     return conf;
   }
 
@@ -77,9 +76,9 @@ public class LongDoubleNullTextInputFormat
     private final Pattern separator = Pattern.compile("[\t ]");
 
     @Override
-    public Vertex<LongWritable, DoubleWritable, NullWritable, ?>
+    public Vertex<LongWritable, DoubleWritable, NullWritable>
     getCurrentVertex() throws IOException, InterruptedException {
-      Vertex<LongWritable, DoubleWritable, NullWritable, ?>
+      Vertex<LongWritable, DoubleWritable, NullWritable>
           vertex = conf.createVertex();
 
       String[] tokens =

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleDoubleTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleDoubleTextInputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleDoubleTextInputFormat.java
index 7dc8475..5023a4e 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleDoubleTextInputFormat.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleDoubleTextInputFormat.java
@@ -27,7 +27,6 @@ import org.apache.giraph.edge.EdgeFactory;
 import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
@@ -45,10 +44,10 @@ public class NormalizingLongDoubleDoubleTextInputFormat
     extends
     TextVertexInputFormat<LongWritable, DoubleWritable, DoubleWritable>
     implements ImmutableClassesGiraphConfigurable<LongWritable, DoubleWritable,
-    DoubleWritable, Writable> {
+    DoubleWritable> {
   /** Configuration. */
   private ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
-      DoubleWritable, Writable> conf;
+      DoubleWritable> conf;
 
   @Override
   public TextVertexReader createVertexReader(
@@ -58,13 +57,13 @@ public class NormalizingLongDoubleDoubleTextInputFormat
 
   @Override
   public void setConf(ImmutableClassesGiraphConfiguration<LongWritable,
-      DoubleWritable, DoubleWritable, Writable> configuration) {
+      DoubleWritable, DoubleWritable> configuration) {
     conf = configuration;
   }
 
   @Override
   public ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
-      DoubleWritable, Writable> getConf() {
+      DoubleWritable> getConf() {
     return conf;
   }
 
@@ -81,10 +80,10 @@ public class NormalizingLongDoubleDoubleTextInputFormat
 
     @Override
     public Vertex<LongWritable, DoubleWritable,
-        DoubleWritable, ?> getCurrentVertex()
+        DoubleWritable> getCurrentVertex()
       throws IOException, InterruptedException {
-      Vertex<LongWritable, DoubleWritable,
-      DoubleWritable, ?> vertex = conf.createVertex();
+      Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex =
+          conf.createVertex();
 
       String[] tokens = edgeSeparator.split(getRecordReader()
           .getCurrentValue().toString());

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankComputation.java
new file mode 100644
index 0000000..9ac90d9
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankComputation.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.MathUtils;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+
+/**
+ * The PageRank algorithm, with uniform transition probabilities on the edges
+ * http://en.wikipedia.org/wiki/PageRank
+ */
+public class PageRankComputation extends RandomWalkComputation<NullWritable> {
+  @Override
+  protected double transitionProbability(
+      Vertex<LongWritable, DoubleWritable, NullWritable> vertex,
+      double stateProbability, Edge<LongWritable, NullWritable> edge) {
+    return stateProbability / vertex.getNumEdges();
+  }
+
+  @Override
+  protected double recompute(
+      Vertex<LongWritable, DoubleWritable, NullWritable> vertex,
+      Iterable<DoubleWritable> partialRanks, double teleportationProbability) {
+    // rank contribution from incident neighbors
+    double rankFromNeighbors = MathUtils.sum(partialRanks);
+    // rank contribution from dangling vertices
+    double danglingContribution =
+        getDanglingProbability() / getTotalNumVertices();
+
+    // recompute rank
+    return (1d - teleportationProbability) *
+        (rankFromNeighbors + danglingContribution) +
+        teleportationProbability / getTotalNumVertices();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java
deleted file mode 100644
index 9678b31..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.utils.MathUtils;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-
-/**
- * The PageRank algorithm, with uniform transition probabilities on the edges
- * http://en.wikipedia.org/wiki/PageRank
- */
-public class PageRankVertex extends RandomWalkVertex<NullWritable> {
-
-  @Override
-  protected double transitionProbability(double stateProbability,
-      Edge<LongWritable, NullWritable> edge) {
-    return stateProbability / getNumEdges();
-  }
-
-  @Override
-  protected double recompute(Iterable<DoubleWritable> partialRanks,
-                             double teleportationProbability) {
-
-    // rank contribution from incident neighbors
-    double rankFromNeighbors = MathUtils.sum(partialRanks);
-    // rank contribution from dangling vertices
-    double danglingContribution =
-        getDanglingProbability() / getTotalNumVertices();
-
-    // recompute rank
-    return (1d - teleportationProbability) *
-        (rankFromNeighbors + danglingContribution) +
-        teleportationProbability / getTotalNumVertices();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/PartitionContextTestVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/PartitionContextTestVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/PartitionContextTestVertex.java
deleted file mode 100644
index f617d8e..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/PartitionContextTestVertex.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.partition.DefaultPartitionContext;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.worker.DefaultWorkerContext;
-import org.apache.giraph.worker.WorkerContext;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
-
-import java.io.IOException;
-
-/**
- * Vertex to test the functionality of PartitionContext
- */
-public class PartitionContextTestVertex extends
-    Vertex<LongWritable, DoubleWritable, FloatWritable,
-        DoubleWritable> {
-  /** How many compute threads to use in the test */
-  public static final int NUM_COMPUTE_THREADS = 10;
-  /** How many vertices to create for the test */
-  public static final int NUM_VERTICES = 100;
-  /** How many partitions to have */
-  public static final int NUM_PARTITIONS = 25;
-
-  @Override
-  public void compute(Iterable<DoubleWritable> messages) throws IOException {
-    TestPartitionContextPartitionContext partitionContext =
-        (TestPartitionContextPartitionContext) getPartitionContext();
-    partitionContext.counter++;
-    if (getSuperstep() > 5) {
-      voteToHalt();
-    }
-  }
-
-  /**
-   * PartitionContext for TestPartitionContext
-   */
-  public static class TestPartitionContextPartitionContext extends
-      DefaultPartitionContext {
-    /**
-     * The counter should hold the number of vertices in this partition,
-     * plus the current superstep
-     */
-    private long counter;
-
-    @Override
-    public void preSuperstep(WorkerContext workerContext) {
-      counter =
-          ((TestPartitionContextWorkerContext) workerContext).superstepCounter;
-    }
-
-    @Override
-    public void postSuperstep(WorkerContext workerContext) {
-      ((TestPartitionContextWorkerContext) workerContext).totalCounter +=
-          counter;
-    }
-  }
-
-  /**
-   * WorkerContext for TestPartitionContext
-   */
-  public static class TestPartitionContextWorkerContext extends
-      DefaultWorkerContext {
-    /** Current superstep */
-    private long superstepCounter;
-    /**
-     * This counter should hold the sum of PartitionContext's counters
-     */
-    private long totalCounter;
-
-    @Override
-    public void preSuperstep() {
-      superstepCounter = getSuperstep();
-      totalCounter = 0;
-    }
-
-    @Override
-    public void postSuperstep() {
-      assertEquals(totalCounter,
-          NUM_PARTITIONS * superstepCounter + getTotalNumVertices());
-    }
-  }
-
-  /**
-   * Throws exception if values are not equal.
-   *
-   * @param expected Expected value
-   * @param actual   Actual value
-   */
-  private static void assertEquals(long expected, long actual) {
-    if (expected != actual) {
-      throw new RuntimeException("expected: " + expected +
-          ", actual: " + actual);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkComputation.java
new file mode 100644
index 0000000..ed95aae
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkComputation.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+
+/**
+ * Base class for executing a random walk on a graph
+ *
+ * @param <E> edge type
+ */
+public abstract class RandomWalkComputation<E extends Writable>
+    extends BasicComputation<LongWritable, DoubleWritable, E, DoubleWritable> {
+  /** Configuration parameter for the number of supersteps to execute */
+  static final String MAX_SUPERSTEPS = RandomWalkComputation.class.getName() +
+      ".maxSupersteps";
+  /** Configuration parameter for the teleportation probability */
+  static final String TELEPORTATION_PROBABILITY = RandomWalkComputation.class
+      .getName() + ".teleportationProbability";
+  /** Name of aggregator for the probability of dangling vertices */
+  static final String CUMULATIVE_DANGLING_PROBABILITY =
+      RandomWalkComputation.class.getName() + ".cumulativeDanglingProbability";
+  /** Name of aggregator for the probability of all vertices */
+  static final String CUMULATIVE_PROBABILITY = RandomWalkComputation.class
+      .getName() + ".cumulativeProbability";
+    /** Name of aggregator for the probability of dangling vertices */
+  static final String NUM_DANGLING_VERTICES = RandomWalkComputation.class
+      .getName() + ".numDanglingVertices";
+  /** Name of aggregator for the L1 norm of the probability difference, used
+   * for covergence detection */
+  static final String L1_NORM_OF_PROBABILITY_DIFFERENCE =
+      RandomWalkComputation.class.getName() + ".l1NormOfProbabilityDifference";
+  /** Reusable {@link DoubleWritable} instance to avoid object instantiation */
+  private final DoubleWritable doubleWritable = new DoubleWritable();
+  /** Reusable {@link LongWritable} for counting dangling vertices */
+  private final LongWritable one = new LongWritable(1);
+
+  /**
+   * Compute an initial probability value for the vertex. Per default,
+   * we start with a uniform distribution.
+   * @return The initial probability value.
+   */
+  protected double initialProbability() {
+    return 1.0 / getTotalNumVertices();
+  }
+
+  /**
+   * Compute the probability of transitioning to a neighbor vertex
+   * @param vertex Vertex
+   * @param stateProbability current steady state probability of the vertex
+   * @param edge edge to neighbor
+   * @return the probability of transitioning to a neighbor vertex
+   */
+  protected abstract double transitionProbability(
+      Vertex<LongWritable, DoubleWritable, E> vertex,
+      double stateProbability,
+      Edge<LongWritable, E> edge);
+
+  /**
+   * Perform a single step of a random walk computation.
+   * @param vertex Vertex
+   * @param messages Messages received in the previous step.
+   * @param teleportationProbability Probability of teleporting to another
+   *          vertex.
+   * @return The new probability distribution value.
+   */
+  protected abstract double recompute(
+      Vertex<LongWritable, DoubleWritable, E> vertex,
+      Iterable<DoubleWritable> messages,
+      double teleportationProbability);
+
+  /**
+   * Returns the cumulative probability from dangling vertices.
+   * @return The cumulative probability from dangling vertices.
+   */
+  protected double getDanglingProbability() {
+    return this.<DoubleWritable>getAggregatedValue(
+        RandomWalkComputation.CUMULATIVE_DANGLING_PROBABILITY).get();
+  }
+
+  /**
+   * Returns the cumulative probability from dangling vertices.
+   * @return The cumulative probability from dangling vertices.
+   */
+  protected double getPreviousCumulativeProbability() {
+    return this.<DoubleWritable>getAggregatedValue(
+        RandomWalkComputation.CUMULATIVE_PROBABILITY).get();
+  }
+
+  @Override
+  public void compute(
+      Vertex<LongWritable, DoubleWritable, E> vertex,
+      Iterable<DoubleWritable> messages) throws IOException {
+    double stateProbability;
+
+    if (getSuperstep() > 0) {
+
+      double previousStateProbability = vertex.getValue().get();
+      stateProbability =
+          recompute(vertex, messages, teleportationProbability());
+
+      // Important: rescale for numerical stability
+      stateProbability /= getPreviousCumulativeProbability();
+
+      doubleWritable.set(Math.abs(stateProbability - previousStateProbability));
+      aggregate(L1_NORM_OF_PROBABILITY_DIFFERENCE, doubleWritable);
+
+    } else {
+      stateProbability = initialProbability();
+    }
+
+    vertex.getValue().set(stateProbability);
+
+    aggregate(CUMULATIVE_PROBABILITY, vertex.getValue());
+
+    // Compute dangling node contribution for next superstep
+    if (vertex.getNumEdges() == 0) {
+      aggregate(NUM_DANGLING_VERTICES, one);
+      aggregate(CUMULATIVE_DANGLING_PROBABILITY, vertex.getValue());
+    }
+
+    if (getSuperstep() < maxSupersteps()) {
+      for (Edge<LongWritable, E> edge : vertex.getEdges()) {
+        double transitionProbability =
+            transitionProbability(vertex, stateProbability, edge);
+        doubleWritable.set(transitionProbability);
+        sendMessage(edge.getTargetVertexId(), doubleWritable);
+      }
+    } else {
+      vertex.voteToHalt();
+    }
+  }
+
+  /**
+   * Reads the number of supersteps to execute from the configuration
+   * @return number of supersteps to execute
+   */
+  private int maxSupersteps() {
+    return ((RandomWalkWorkerContext) getWorkerContext()).getMaxSupersteps();
+  }
+
+  /**
+   * Reads the teleportation probability from the configuration
+   * @return teleportation probability
+   */
+  protected double teleportationProbability() {
+    return ((RandomWalkWorkerContext) getWorkerContext())
+        .getTeleportationProbability();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java
deleted file mode 100644
index 2d2c988..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
-import java.io.IOException;
-
-/**
- * Base class for executing a random walk on a graph
- *
- * @param <E> edge type
- */
-public abstract class RandomWalkVertex<E extends Writable>
-    extends Vertex<LongWritable, DoubleWritable, E, DoubleWritable> {
-  /** Configuration parameter for the number of supersteps to execute */
-  static final String MAX_SUPERSTEPS = RandomWalkVertex.class.getName() +
-      ".maxSupersteps";
-  /** Configuration parameter for the teleportation probability */
-  static final String TELEPORTATION_PROBABILITY = RandomWalkVertex.class
-      .getName() + ".teleportationProbability";
-  /** Name of aggregator for the probability of dangling vertices */
-  static final String CUMULATIVE_DANGLING_PROBABILITY = RandomWalkVertex.class
-      .getName() + ".cumulativeDanglingProbability";
-  /** Name of aggregator for the probability of all vertices */
-  static final String CUMULATIVE_PROBABILITY = RandomWalkVertex.class
-      .getName() + ".cumulativeProbability";
-    /** Name of aggregator for the probability of dangling vertices */
-  static final String NUM_DANGLING_VERTICES = RandomWalkVertex.class
-      .getName() + ".numDanglingVertices";
-  /** Name of aggregator for the L1 norm of the probability difference, used
-   * for covergence detection */
-  static final String L1_NORM_OF_PROBABILITY_DIFFERENCE = RandomWalkVertex.class
-      .getName() + ".l1NormOfProbabilityDifference";
-  /** Reusable {@link DoubleWritable} instance to avoid object instantiation */
-  private final DoubleWritable doubleWritable = new DoubleWritable();
-  /** Reusable {@link LongWritable} for counting dangling vertices */
-  private final LongWritable one = new LongWritable(1);
-
-  /**
-   * Compute an initial probability value for the vertex. Per default,
-   * we start with a uniform distribution.
-   * @return The initial probability value.
-   */
-  protected double initialProbability() {
-    return 1.0 / getTotalNumVertices();
-  }
-
-  /**
-   * Compute the probability of transitioning to a neighbor vertex
-   * @param stateProbability current steady state probability of the vertex
-   * @param edge edge to neighbor
-   * @return the probability of transitioning to a neighbor vertex
-   */
-  protected abstract double transitionProbability(double stateProbability,
-      Edge<LongWritable, E> edge);
-
-  /**
-   * Perform a single step of a random walk computation.
-   * @param messages Messages received in the previous step.
-   * @param teleportationProbability Probability of teleporting to another
-   *          vertex.
-   * @return The new probability distribution value.
-   */
-  protected abstract double recompute(Iterable<DoubleWritable> messages,
-      double teleportationProbability);
-
-  /**
-   * Returns the cumulative probability from dangling vertices.
-   * @return The cumulative probability from dangling vertices.
-   */
-  protected double getDanglingProbability() {
-    return this.<DoubleWritable>getAggregatedValue(
-        RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY).get();
-  }
-
-  /**
-   * Returns the cumulative probability from dangling vertices.
-   * @return The cumulative probability from dangling vertices.
-   */
-  protected double getPreviousCumulativeProbability() {
-    return this.<DoubleWritable>getAggregatedValue(
-        RandomWalkVertex.CUMULATIVE_PROBABILITY).get();
-  }
-
-  @Override
-  public void compute(Iterable<DoubleWritable> messages) throws IOException {
-    double stateProbability;
-
-    if (getSuperstep() > 0) {
-
-      double previousStateProbability = getValue().get();
-      stateProbability = recompute(messages, teleportationProbability());
-
-      // Important: rescale for numerical stability
-      stateProbability /= getPreviousCumulativeProbability();
-
-      doubleWritable.set(Math.abs(stateProbability - previousStateProbability));
-      aggregate(L1_NORM_OF_PROBABILITY_DIFFERENCE, doubleWritable);
-
-    } else {
-      stateProbability = initialProbability();
-    }
-
-    getValue().set(stateProbability);
-
-    aggregate(CUMULATIVE_PROBABILITY, getValue());
-
-    // Compute dangling node contribution for next superstep
-    if (getNumEdges() == 0) {
-      aggregate(NUM_DANGLING_VERTICES, one);
-      aggregate(CUMULATIVE_DANGLING_PROBABILITY, getValue());
-    }
-
-    if (getSuperstep() < maxSupersteps()) {
-      for (Edge<LongWritable, E> edge : getEdges()) {
-        double transitionProbability =
-            transitionProbability(stateProbability, edge);
-        doubleWritable.set(transitionProbability);
-        sendMessage(edge.getTargetVertexId(), doubleWritable);
-      }
-    } else {
-      voteToHalt();
-    }
-  }
-
-  /**
-   * Reads the number of supersteps to execute from the configuration
-   * @return number of supersteps to execute
-   */
-  private int maxSupersteps() {
-    return ((RandomWalkWorkerContext) getWorkerContext()).getMaxSupersteps();
-  }
-
-  /**
-   * Reads the teleportation probability from the configuration
-   * @return teleportation probability
-   */
-  protected double teleportationProbability() {
-    return ((RandomWalkWorkerContext) getWorkerContext())
-        .getTeleportationProbability();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertexMasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertexMasterCompute.java b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertexMasterCompute.java
index 9e5dbbf..8b5f23b 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertexMasterCompute.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertexMasterCompute.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.log4j.Logger;
 
 /**
- * Master compute associated with {@link RandomWalkVertex}. It handles
+ * Master compute associated with {@link RandomWalkComputation}. It handles
  * dangling nodes.
  */
 public class RandomWalkVertexMasterCompute extends DefaultMasterCompute {
@@ -42,16 +42,16 @@ public class RandomWalkVertexMasterCompute extends DefaultMasterCompute {
   public void compute() {
     double danglingContribution =
         this.<DoubleWritable>getAggregatedValue(
-            RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY).get();
+            RandomWalkComputation.CUMULATIVE_DANGLING_PROBABILITY).get();
     double cumulativeProbability =
         this.<DoubleWritable>getAggregatedValue(
-            RandomWalkVertex.CUMULATIVE_PROBABILITY).get();
+            RandomWalkComputation.CUMULATIVE_PROBABILITY).get();
     double l1NormOfStateDiff =
         this.<DoubleWritable>getAggregatedValue(
-            RandomWalkVertex.L1_NORM_OF_PROBABILITY_DIFFERENCE).get();
+            RandomWalkComputation.L1_NORM_OF_PROBABILITY_DIFFERENCE).get();
     long numDanglingVertices =
         this.<LongWritable>getAggregatedValue(
-            RandomWalkVertex.NUM_DANGLING_VERTICES).get();
+            RandomWalkComputation.NUM_DANGLING_VERTICES).get();
 
     LOG.info("[Superstep " + getSuperstep() + "] Dangling contribution = " +
         danglingContribution + ", number of dangling vertices = " +
@@ -69,13 +69,13 @@ public class RandomWalkVertexMasterCompute extends DefaultMasterCompute {
   @Override
   public void initialize() throws InstantiationException,
       IllegalAccessException {
-    registerAggregator(RandomWalkVertex.NUM_DANGLING_VERTICES,
+    registerAggregator(RandomWalkComputation.NUM_DANGLING_VERTICES,
         LongSumAggregator.class);
-    registerAggregator(RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY,
+    registerAggregator(RandomWalkComputation.CUMULATIVE_DANGLING_PROBABILITY,
         DoubleSumAggregator.class);
-    registerAggregator(RandomWalkVertex.CUMULATIVE_PROBABILITY,
+    registerAggregator(RandomWalkComputation.CUMULATIVE_PROBABILITY,
         DoubleSumAggregator.class);
-    registerAggregator(RandomWalkVertex.L1_NORM_OF_PROBABILITY_DIFFERENCE,
+    registerAggregator(RandomWalkComputation.L1_NORM_OF_PROBABILITY_DIFFERENCE,
         DoubleSumAggregator.class);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartComputation.java
new file mode 100644
index 0000000..94e5d60
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartComputation.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import com.google.common.base.Preconditions;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.MathUtils;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Executes "RandomWalkWithRestart", a random walk on the graph which is biased
+ * towards a source vertex. The resulting probabilities of staying at a given
+ * vertex can be interpreted as a measure of proximity to the source vertex.
+ */
+public class RandomWalkWithRestartComputation
+    extends RandomWalkComputation<DoubleWritable> {
+
+  /** Configuration parameter for the source vertex */
+  static final String SOURCE_VERTEX = RandomWalkWithRestartComputation.class
+      .getName() + ".sourceVertex";
+
+  /**
+   * Checks whether the currently executed vertex is the source vertex
+   * @param vertex Vertex
+   * @return is the currently executed vertex the source vertex?
+   */
+  private boolean isSourceVertex(Vertex<LongWritable, ?, ?> vertex) {
+    return ((RandomWalkWorkerContext) getWorkerContext()).isSource(
+        vertex.getId().get());
+  }
+
+  /**
+   * Returns the number of source vertices.
+   * @return The number of source vertices.
+   */
+  private int numSourceVertices() {
+    return ((RandomWalkWorkerContext) getWorkerContext()).numSources();
+  }
+
+  @Override
+  protected double transitionProbability(
+      Vertex<LongWritable, DoubleWritable, DoubleWritable>
+          vertex,
+      double stateProbability, Edge<LongWritable, DoubleWritable> edge) {
+    return stateProbability * edge.getValue().get();
+  }
+
+  @Override
+  protected double recompute(
+      Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex,
+      Iterable<DoubleWritable> transitionProbabilities,
+      double teleportationProbability) {
+    int numSourceVertices = numSourceVertices();
+    Preconditions.checkState(numSourceVertices > 0, "No source vertex found");
+
+    double stateProbability = MathUtils.sum(transitionProbabilities);
+    // Add the contribution of dangling nodes (weakly preferential
+    // implementation: dangling nodes redistribute uniformly)
+    stateProbability += getDanglingProbability() / getTotalNumVertices();
+    // The random walk might teleport back to one of the source vertexes
+    stateProbability *= 1 - teleportationProbability;
+    if (isSourceVertex(vertex)) {
+      stateProbability += teleportationProbability / numSourceVertices;
+    }
+    return stateProbability;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java
deleted file mode 100644
index 6f3eb6c..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import com.google.common.base.Preconditions;
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.utils.MathUtils;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-
-/**
- * Executes "RandomWalkWithRestart", a random walk on the graph which is biased
- * towards a source vertex. The resulting probabilities of staying at a given
- * vertex can be interpreted as a measure of proximity to the source vertex.
- */
-public class RandomWalkWithRestartVertex
-    extends RandomWalkVertex<DoubleWritable> {
-
-  /** Configuration parameter for the source vertex */
-  static final String SOURCE_VERTEX = RandomWalkWithRestartVertex.class
-      .getName() + ".sourceVertex";
-
-  /**
-   * Checks whether the currently executed vertex is the source vertex
-   * @return is the currently executed vertex the source vertex?
-   */
-  private boolean isSourceVertex() {
-    return ((RandomWalkWorkerContext) getWorkerContext()).isSource(getId()
-        .get());
-  }
-
-  /**
-   * Returns the number of source vertices.
-   * @return The number of source vertices.
-   */
-  private int numSourceVertices() {
-    return ((RandomWalkWorkerContext) getWorkerContext()).numSources();
-  }
-
-  @Override
-  protected double transitionProbability(double stateProbability,
-      Edge<LongWritable, DoubleWritable> edge) {
-    return stateProbability * edge.getValue().get();
-  }
-
-  @Override
-  protected double recompute(Iterable<DoubleWritable> transitionProbabilities,
-      double teleportationProbability) {
-
-    int numSourceVertices = numSourceVertices();
-    Preconditions.checkState(numSourceVertices > 0, "No source vertex found");
-
-    double stateProbability = MathUtils.sum(transitionProbabilities);
-    // Add the contribution of dangling nodes (weakly preferential
-    // implementation: dangling nodes redistribute uniformly)
-    stateProbability += getDanglingProbability() / getTotalNumVertices();
-    // The random walk might teleport back to one of the source vertexes
-    stateProbability *= 1 - teleportationProbability;
-    if (isSourceVertex()) {
-      stateProbability += teleportationProbability / numSourceVertices;
-    }
-    return stateProbability;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java
index 2566f43..5c23b5a 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java
@@ -48,8 +48,8 @@ public class RandomWalkWorkerContext extends WorkerContext {
   private static Set<Long> SOURCES;
 
   /** Configuration parameter for the source vertex */
-  private static final String SOURCE_VERTEX = RandomWalkWithRestartVertex.class
-      .getName() + ".sourceVertex";
+  private static final String SOURCE_VERTEX =
+      RandomWalkWithRestartComputation.class.getName() + ".sourceVertex";
 
   /** Logger */
   private static final Logger LOG = Logger
@@ -143,10 +143,10 @@ public class RandomWalkWorkerContext extends WorkerContext {
   public void preApplication() throws InstantiationException,
       IllegalAccessException {
     Configuration configuration = getContext().getConfiguration();
-    MAX_SUPERSTEPS = configuration.getInt(RandomWalkVertex.MAX_SUPERSTEPS,
+    MAX_SUPERSTEPS = configuration.getInt(RandomWalkComputation.MAX_SUPERSTEPS,
         DEFAULT_MAX_SUPERSTEPS);
     TELEPORTATION_PROBABILITY = configuration.getFloat(
-        RandomWalkVertex.TELEPORTATION_PROBABILITY,
+        RandomWalkComputation.TELEPORTATION_PROBABILITY,
         DEFAULT_TELEPORTATION_PROBABILITY);
     SOURCES = initializeSources(configuration);
   }


Mime
View raw message