giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject [1/6] GIRAPH-470 (tavoaqp via nitay)
Date Sun, 10 Feb 2013 02:13:21 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/ab64a4d0/giraph-examples/src/test/java/org/apache/giraph/TestMutateGraph.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestMutateGraph.java b/giraph-examples/src/test/java/org/apache/giraph/TestMutateGraph.java
new file mode 100644
index 0000000..0427b85
--- /dev/null
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestMutateGraph.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph;
+
+import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.examples.SimpleMutateGraphVertex;
+import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat;
+import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexOutputFormat;
+import org.apache.giraph.job.GiraphJob;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit test for graph mutation
+ */
+public class TestMutateGraph extends BspCase {
+  public TestMutateGraph() {
+      super(TestMutateGraph.class.getName());
+  }
+
+  /**
+   * Run a job that tests the various graph mutations that can occur
+   *
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testMutateGraph()
+          throws IOException, InterruptedException, ClassNotFoundException {
+    GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+        classes = new GiraphClasses();
+    classes.setVertexClass(SimpleMutateGraphVertex.class);
+    classes.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
+    classes.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+    classes.setWorkerContextClass(
+        SimpleMutateGraphVertex.SimpleMutateGraphVertexWorkerContext.class);
+    GiraphJob job = prepareJob(getCallingMethodName(), classes,
+        getTempPath(getCallingMethodName()));
+    assertTrue(job.run(true));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ab64a4d0/giraph-examples/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java b/giraph-examples/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java
new file mode 100644
index 0000000..759624b
--- /dev/null
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph;
+
+import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.examples.SimpleCheckpointVertex;
+import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
+import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
+import org.apache.giraph.job.GiraphJob;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Unit test for not enough map tasks
+ */
+public class TestNotEnoughMapTasks extends BspCase {
+
+  public TestNotEnoughMapTasks() {
+    super(TestNotEnoughMapTasks.class.getName());
+  }
+
+  /**
+   * This job should always fail gracefully with not enough map tasks.
+   *
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testNotEnoughMapTasks()
+      throws IOException, InterruptedException, ClassNotFoundException {
+    if (!runningInDistributedMode()) {
+      System.out.println(
+          "testNotEnoughMapTasks: Ignore this test in local mode.");
+      return;
+    }
+    Path outputPath = getTempPath(getCallingMethodName());
+    GiraphClasses classes = new GiraphClasses();
+    classes.setVertexClass(
+        SimpleCheckpointVertex.SimpleCheckpointComputation.class);
+    classes.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+    classes.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
+    GiraphJob job = prepareJob(getCallingMethodName(), classes, outputPath);
+
+    // An unlikely impossible number of workers to achieve
+    final int unlikelyWorkers = Short.MAX_VALUE;
+    job.getConfiguration().setWorkerConfiguration(unlikelyWorkers,
+        unlikelyWorkers,
+        100.0f);
+    // Only one poll attempt of one second to make failure faster
+    job.getConfiguration().setMaxMasterSuperstepWaitMsecs(1000);
+    job.getConfiguration().setEventWaitMsecs(1000);
+    assertFalse(job.run(false));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ab64a4d0/giraph-examples/src/test/java/org/apache/giraph/TestPartitionContext.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestPartitionContext.java b/giraph-examples/src/test/java/org/apache/giraph/TestPartitionContext.java
new file mode 100644
index 0000000..cdf1f65
--- /dev/null
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestPartitionContext.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph;
+
+import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.examples.PartitionContextTestVertex;
+import org.apache.giraph.examples.GeneratedVertexReader;
+import org.apache.giraph.examples.SimplePageRankVertex;
+import org.apache.giraph.job.GiraphJob;
+import org.apache.giraph.partition.HashMasterPartitioner;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+public class TestPartitionContext extends BspCase {
+  public TestPartitionContext() {
+    super(TestPartitionContext.class.getName());
+  }
+
+  @Test
+  public void testPartitionContext() throws IOException,
+      ClassNotFoundException, InterruptedException {
+    if (runningInDistributedMode()) {
+      System.out.println(
+          "testComputeContext: Ignore this test in distributed mode.");
+      return;
+    }
+    GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+        classes = new GiraphClasses();
+    classes.setVertexClass(PartitionContextTestVertex.class);
+    classes.setVertexInputFormatClass(
+        SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
+    classes.setWorkerContextClass(
+        PartitionContextTestVertex.TestPartitionContextWorkerContext.class);
+    classes.setPartitionContextClass(
+        PartitionContextTestVertex.TestPartitionContextPartitionContext.class);
+    GiraphJob job = prepareJob(getCallingMethodName(), classes);
+    // Use multithreading
+    job.getConfiguration().setNumComputeThreads(
+        PartitionContextTestVertex.NUM_COMPUTE_THREADS);
+    // Increase the number of vertices
+    job.getConfiguration().setInt(
+        GeneratedVertexReader.READER_VERTICES,
+        PartitionContextTestVertex.NUM_VERTICES);
+    // Increase the number of partitions
+    job.getConfiguration().setInt(
+        HashMasterPartitioner.USER_PARTITION_COUNT,
+        PartitionContextTestVertex.NUM_PARTITIONS);
+    assertTrue(job.run(true));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ab64a4d0/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java b/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
new file mode 100644
index 0000000..7deeb42
--- /dev/null
+++ b/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
@@ -0,0 +1,197 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.giraph.aggregators;
+
+import org.apache.giraph.BspCase;
+import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.comm.aggregators.AggregatorUtils;
+import org.apache.giraph.examples.AggregatorsTestVertex;
+import org.apache.giraph.examples.SimpleCheckpointVertex;
+import org.apache.giraph.examples.SimplePageRankVertex;
+import org.apache.giraph.job.GiraphJob;
+import org.apache.giraph.master.MasterAggregatorHandler;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.Progressable;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Map;
+
+/** Tests if aggregators are handled on a proper way */
+public class TestAggregatorsHandling extends BspCase {
+
+  public TestAggregatorsHandling() {
+    super(TestAggregatorsHandling.class.getName());
+  }
+
+  private Map<String, AggregatorWrapper<Writable>> getAggregatorMap
+      (MasterAggregatorHandler aggregatorHandler) {
+    try {
+      Field aggregtorMapField = aggregatorHandler.getClass().getDeclaredField
+          ("aggregatorMap");
+      aggregtorMapField.setAccessible(true);
+      return (Map<String, AggregatorWrapper<Writable>>)
+          aggregtorMapField.get(aggregatorHandler);
+    } catch (IllegalAccessException e) {
+      throw new IllegalStateException(e);
+    } catch (NoSuchFieldException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  /** Tests if aggregators are handled on a proper way during supersteps */
+  @Test
+  public void testAggregatorsHandling() throws IOException,
+      ClassNotFoundException, InterruptedException {
+    GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+        classes = new GiraphClasses();
+    classes.setVertexClass(AggregatorsTestVertex.class);
+    classes.setVertexInputFormatClass(
+        SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
+    GiraphJob job = prepareJob(getCallingMethodName(), classes);
+    job.getConfiguration().setMasterComputeClass(
+        AggregatorsTestVertex.AggregatorsTestMasterCompute.class);
+    // test with aggregators split in a few requests
+    job.getConfiguration().setInt(
+        AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST, 50);
+    assertTrue(job.run(true));
+  }
+
+  /** Test if aggregators serialization captures everything */
+  @Test
+  public void testMasterAggregatorsSerialization() throws
+      IllegalAccessException, InstantiationException, IOException {
+    ImmutableClassesGiraphConfiguration conf =
+        Mockito.mock(ImmutableClassesGiraphConfiguration.class);
+    Mockito.when(conf.getAggregatorWriterClass()).thenReturn(
+        TextAggregatorWriter.class);
+    Progressable progressable = Mockito.mock(Progressable.class);
+    MasterAggregatorHandler handler =
+        new MasterAggregatorHandler(conf, progressable);
+
+    String regularAggName = "regular";
+    LongWritable regularValue = new LongWritable(5);
+    handler.registerAggregator(regularAggName, LongSumAggregator.class);
+    handler.setAggregatedValue(regularAggName, regularValue);
+
+    String persistentAggName = "persistent";
+    DoubleWritable persistentValue = new DoubleWritable(10.5);
+    handler.registerPersistentAggregator(persistentAggName,
+        DoubleOverwriteAggregator.class);
+    handler.setAggregatedValue(persistentAggName, persistentValue);
+
+    for (AggregatorWrapper<Writable> aggregator :
+        getAggregatorMap(handler).values()) {
+      aggregator.setPreviousAggregatedValue(
+          aggregator.getCurrentAggregatedValue());
+    }
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    handler.write(new DataOutputStream(out));
+
+    MasterAggregatorHandler restartedHandler =
+        new MasterAggregatorHandler(conf, progressable);
+    restartedHandler.readFields(
+        new DataInputStream(new ByteArrayInputStream(out.toByteArray())));
+
+    assertEquals(2, getAggregatorMap(restartedHandler).size());
+
+    AggregatorWrapper<Writable> regularAgg =
+        getAggregatorMap(restartedHandler).get(regularAggName);
+    assertTrue(
+        regularAgg.getAggregatorClass().equals(LongSumAggregator.class));
+    assertEquals(regularValue, regularAgg.getPreviousAggregatedValue());
+    assertEquals(regularValue,
+        restartedHandler.<LongWritable>getAggregatedValue(regularAggName));
+    assertFalse(regularAgg.isPersistent());
+
+    AggregatorWrapper<Writable> persistentAgg =
+        getAggregatorMap(restartedHandler).get(persistentAggName);
+    assertTrue(persistentAgg.getAggregatorClass().equals
+        (DoubleOverwriteAggregator.class));
+    assertEquals(persistentValue, persistentAgg.getPreviousAggregatedValue());
+    assertEquals(persistentValue,
+        restartedHandler.<LongWritable>getAggregatedValue(persistentAggName));
+    assertTrue(persistentAgg.isPersistent());
+  }
+
+  /**
+   * Test if aggregators are are handled properly when restarting from a
+   * checkpoint
+   */
+  @Test
+  public void testAggregatorsCheckpointing() throws ClassNotFoundException,
+      IOException, InterruptedException {
+    Path checkpointsDir = getTempPath("checkPointsForTesting");
+    Path outputPath = getTempPath(getCallingMethodName());
+    GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+        classes = new GiraphClasses();
+    classes.setVertexClass(AggregatorsTestVertex.class);
+    classes.setMasterComputeClass(
+        AggregatorsTestVertex.AggregatorsTestMasterCompute.class);
+    classes.setVertexInputFormatClass(
+        SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
+    GiraphJob job = prepareJob(getCallingMethodName(), classes, outputPath);
+
+    job.getConfiguration().set(GiraphConstants.CHECKPOINT_DIRECTORY,
+        checkpointsDir.toString());
+    job.getConfiguration().setBoolean(
+        GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS, false);
+    job.getConfiguration().setCheckpointFrequency(4);
+
+    assertTrue(job.run(true));
+
+    // Restart the test from superstep 4
+    System.out.println("testAggregatorsCheckpointing: Restarting from " +
+        "superstep 4 with checkpoint path = " + checkpointsDir);
+    outputPath = getTempPath(getCallingMethodName() + "Restarted");
+    classes = new GiraphClasses();
+    classes.setVertexClass(AggregatorsTestVertex.class);
+    classes.setMasterComputeClass(
+        AggregatorsTestVertex.AggregatorsTestMasterCompute.class);
+    classes.setVertexInputFormatClass(
+        SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
+    GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted",
+        classes, outputPath);
+    job.getConfiguration().setMasterComputeClass(
+        SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
+    restartedJob.getConfiguration().set(
+        GiraphConstants.CHECKPOINT_DIRECTORY, checkpointsDir.toString());
+    restartedJob.getConfiguration().setLong(
+        GiraphConstants.RESTART_SUPERSTEP, 4);
+
+    assertTrue(restartedJob.run(true));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ab64a4d0/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTest.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTest.java
new file mode 100644
index 0000000..49a2f33
--- /dev/null
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.combiner.MinimumIntCombiner;
+import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
+import org.apache.giraph.io.formats.IntIntNullIntTextInputFormat;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.junit.Test;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.SetMultimap;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ *  Tests for {@link ConnectedComponentsVertex}
+ */
+public class ConnectedComponentsVertexTest {
+
+    /**
+     * A local integration test on toy data
+     */
+    @Test
+    public void testToyData() throws Exception {
+
+        // a small graph with three components
+        String[] graph = new String[] {
+                "1 2 3",
+                "2 1 4 5",
+                "3 1 4",
+                "4 2 3 5 13",
+                "5 2 4 12 13",
+                "12 5 13",
+                "13 4 5 12",
+
+                "6 7 8",
+                "7 6 10 11",
+                "8 6 10",
+                "10 7 8 11",
+                "11 7 10",
+
+                "9" };
+
+        GiraphClasses classes = new GiraphClasses();
+        classes.setVertexClass(ConnectedComponentsVertex.class);
+        classes.setCombinerClass(MinimumIntCombiner.class);
+        classes.setVertexInputFormatClass(IntIntNullIntTextInputFormat.class);
+        classes.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+        Map<String, String> emptyParams = ImmutableMap.of();
+
+        // run internally
+        Iterable<String> results = InternalVertexRunner.run(classes,
+            emptyParams, graph);
+
+        SetMultimap<Integer,Integer> components = parseResults(results);
+
+        Set<Integer> componentIDs = components.keySet();
+        assertEquals(3, componentIDs.size());
+        assertTrue(componentIDs.contains(1));
+        assertTrue(componentIDs.contains(6));
+        assertTrue(componentIDs.contains(9));
+
+        Set<Integer> componentOne = components.get(1);
+        assertEquals(7, componentOne.size());
+        assertTrue(componentOne.contains(1));
+        assertTrue(componentOne.contains(2));
+        assertTrue(componentOne.contains(3));
+        assertTrue(componentOne.contains(4));
+        assertTrue(componentOne.contains(5));
+        assertTrue(componentOne.contains(12));
+        assertTrue(componentOne.contains(13));
+
+        Set<Integer> componentTwo = components.get(6);
+        assertEquals(5, componentTwo.size());
+        assertTrue(componentTwo.contains(6));
+        assertTrue(componentTwo.contains(7));
+        assertTrue(componentTwo.contains(8));
+        assertTrue(componentTwo.contains(10));
+        assertTrue(componentTwo.contains(11));
+
+        Set<Integer> componentThree = components.get(9);
+        assertEquals(1, componentThree.size());
+        assertTrue(componentThree.contains(9));
+    }
+
+    private SetMultimap<Integer,Integer> parseResults(
+            Iterable<String> results) {
+        SetMultimap<Integer,Integer> components = HashMultimap.create();
+        for (String result : results) {
+            Iterable<String> parts = Splitter.on('\t').split(result);
+            int vertex = Integer.parseInt(Iterables.get(parts, 0));
+            int component = Integer.parseInt(Iterables.get(parts, 1));
+            components.put(component, vertex);
+        }
+        return components;
+    }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ab64a4d0/giraph-examples/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
new file mode 100644
index 0000000..434c756
--- /dev/null
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MinimumIntCombiner;
+import org.apache.hadoop.io.IntWritable;
+import org.junit.Test;
+
+public class MinimumIntCombinerTest {
+
+  @Test
+  public void testCombiner() throws Exception {
+    Combiner<IntWritable, IntWritable> combiner =
+        new MinimumIntCombiner();
+
+    IntWritable vertexId = new IntWritable(1);
+    IntWritable result = combiner.createInitialMessage();
+    combiner.combine(vertexId, result, new IntWritable(39947466));
+    combiner.combine(vertexId, result, new IntWritable(199));
+    combiner.combine(vertexId, result, new IntWritable(42));
+    combiner.combine(vertexId, result, new IntWritable(19998888));
+    assertEquals(42, result.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ab64a4d0/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java
new file mode 100644
index 0000000..4052fe1
--- /dev/null
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.examples.RandomWalkVertex.RandomWalkVertexMasterCompute;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link RandomWalkWithRestartVertex}
+ */
+public class RandomWalkWithRestartVertexTest {
+
+  /** Minimum difference between doubles */
+  private static final double EPSILON = 10e-3;
+
+  /**
+   * A local integration test on toy data
+   */
+  @Test
+  public void testToyData() throws Exception {
+
+    // A small graph
+    String[] graph = new String[] { "12 34 56", "34 78", "56 34 78", "78 34" };
+
+    Map<String, String> params = Maps.newHashMap();
+    params.put(RandomWalkWithRestartVertex.SOURCE_VERTEX, "12");
+    params.put(RandomWalkWithRestartVertex.MAX_SUPERSTEPS, "30");
+    params.put(RandomWalkWithRestartVertex.TELEPORTATION_PROBABILITY, "0.25");
+
+    GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+        classes = new GiraphClasses();
+    classes.setVertexClass(RandomWalkWithRestartVertex.class);
+    classes.setVertexInputFormatClass(
+        LongDoubleFloatDoubleTextInputFormat.class);
+    classes.setVertexOutputFormatClass(
+        VertexWithDoubleValueFloatEdgeTextOutputFormat.class);
+    classes.setWorkerContextClass(RandomWalkWorkerContext.class);
+    classes.setMasterComputeClass(RandomWalkVertexMasterCompute.class);
+    // Run internally
+    Iterable<String> results = InternalVertexRunner.run(classes, params, graph);
+
+    Map<Long, Double> steadyStateProbabilities =
+        parseSteadyStateProbabilities(results);
+    // values computed with external software
+    // 0.25, 0.354872, 0.09375, 0.301377
+    assertEquals(0.25, steadyStateProbabilities.get(12L), EPSILON);
+    assertEquals(0.354872, steadyStateProbabilities.get(34L), EPSILON);
+    assertEquals(0.09375, steadyStateProbabilities.get(56L), EPSILON);
+    assertEquals(0.301377, steadyStateProbabilities.get(78L), EPSILON);
+  }
+
+  /**
+   * A local integration test on toy data
+   */
+  @Test
+  public void testWeightedGraph() throws Exception {
+    // A small graph
+    String[] graph =
+        new String[] { "12 34:0.1 56:0.9", "34 78:0.9 56:0.1",
+          "56 12:0.1 34:0.8 78:0.1", "78 34:1.0" };
+
+    Map<String, String> params = Maps.newHashMap();
+    params.put(RandomWalkWithRestartVertex.SOURCE_VERTEX, "12");
+    params.put(RandomWalkWithRestartVertex.MAX_SUPERSTEPS, "30");
+    params.put(RandomWalkWithRestartVertex.TELEPORTATION_PROBABILITY, "0.15");
+
+    GiraphClasses classes = new GiraphClasses();
+    classes.setVertexClass(RandomWalkWithRestartVertex.class);
+    classes.setVertexInputFormatClass(
+        NormalizingLongDoubleFloatDoubleTextInputFormat.class);
+    classes.setVertexOutputFormatClass(
+        VertexWithDoubleValueFloatEdgeTextOutputFormat.class);
+    classes.setWorkerContextClass(RandomWalkWorkerContext.class);
+    classes.setMasterComputeClass(RandomWalkVertexMasterCompute.class);
+    // Run internally
+    Iterable<String> results = InternalVertexRunner.run(classes, params, graph);
+
+    Map<Long, Double> steadyStateProbabilities =
+        parseSteadyStateProbabilities(results);
+    // values computed with external software
+    // 0.163365, 0.378932, 0.156886, 0.300816
+    assertEquals(0.163365, steadyStateProbabilities.get(12L), EPSILON);
+    assertEquals(0.378932, steadyStateProbabilities.get(34L), EPSILON);
+    assertEquals(0.156886, steadyStateProbabilities.get(56L), EPSILON);
+    assertEquals(0.300816, steadyStateProbabilities.get(78L), EPSILON);
+  }
+
+  /**
+   * Parse steady state probabilities.
+   * @param results The steady state probabilities in text format.
+   * @return A map representation of the steady state probabilities.
+   */
+  private Map<Long, Double> parseSteadyStateProbabilities(
+      Iterable<String> results) {
+    Map<Long, Double> result = Maps.newHashMap();
+    for (String s : results) {
+      String[] tokens = s.split("\\t");
+      Long id = Long.parseLong(tokens[0]);
+      Double value = Double.parseDouble(tokens[1]);
+      result.put(id, value);
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ab64a4d0/giraph-examples/src/test/java/org/apache/giraph/examples/SimpleShortestPathsVertexTest.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/SimpleShortestPathsVertexTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/SimpleShortestPathsVertexTest.java
new file mode 100644
index 0000000..7a7b148
--- /dev/null
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/SimpleShortestPathsVertexTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.graph.DefaultEdge;
+import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat;
+import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexOutputFormat;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.giraph.utils.MockUtils;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+/**
+ * Contains a simple unit test for {@link SimpleShortestPathsVertex}
+ */
+public class SimpleShortestPathsVertexTest {
+
+  /**
+   * Test the behavior when a shorter path to a vertex has been found
+   */
+  @Test
+  public void testOnShorterPathFound() throws Exception {
+
+    SimpleShortestPathsVertex vertex = new SimpleShortestPathsVertex();
+    vertex.initialize(null, null);
+    vertex.addEdge(new DefaultEdge<LongWritable, FloatWritable>(
+        new LongWritable(10L), new FloatWritable(2.5f)));
+    vertex.addEdge(new DefaultEdge<LongWritable, FloatWritable>(
+        new LongWritable(20L), new FloatWritable(0.5f)));
+
+    MockUtils.MockedEnvironment<LongWritable, DoubleWritable, FloatWritable,
+    DoubleWritable> env = MockUtils.prepareVertex(vertex, 1L,
+        new LongWritable(7L), new DoubleWritable(Double.MAX_VALUE),
+        false);
+
+    Mockito.when(env.getConfiguration().getLong(
+        SimpleShortestPathsVertex.SOURCE_ID,
+        SimpleShortestPathsVertex.SOURCE_ID_DEFAULT)).thenReturn(2L);
+
+    vertex.compute(Lists.newArrayList(new DoubleWritable(2),
+        new DoubleWritable(1.5)));
+
+    assertTrue(vertex.isHalted());
+    assertEquals(1.5d, vertex.getValue().get(), 0d);
+
+    env.verifyMessageSent(new LongWritable(10L), new DoubleWritable(4));
+    env.verifyMessageSent(new LongWritable(20L), new DoubleWritable(2));
+  }
+
+  /**
+   * Test the behavior when a new, but not shorter path to a vertex has been
+   * found.
+   */
+  @Test
+  public void testOnNoShorterPathFound() throws Exception {
+
+    SimpleShortestPathsVertex vertex = new SimpleShortestPathsVertex();
+    vertex.initialize(new LongWritable(0), new DoubleWritable(0.0));
+    vertex.addEdge(new DefaultEdge<LongWritable, FloatWritable>(
+        new LongWritable(10L), new FloatWritable(2.5f)));
+    vertex.addEdge(new DefaultEdge<LongWritable, FloatWritable>(
+        new LongWritable(20L), new FloatWritable(0.5f)));
+
+    MockUtils.MockedEnvironment<LongWritable, DoubleWritable, FloatWritable,
+    DoubleWritable> env = MockUtils.prepareVertex(vertex, 1L,
+        new LongWritable(7L), new DoubleWritable(0.5), false);
+
+    Mockito.when(env.getConfiguration().getLong(
+        SimpleShortestPathsVertex.SOURCE_ID,
+        SimpleShortestPathsVertex.SOURCE_ID_DEFAULT)).thenReturn(2L);
+
+    vertex.compute(Lists.newArrayList(new DoubleWritable(2),
+        new DoubleWritable(1.5)));
+
+    assertTrue(vertex.isHalted());
+    assertEquals(0.5d, vertex.getValue().get(), 0d);
+
+    env.verifyNoMessageSent();
+  }
+
+  /**
+   * A local integration test on toy data
+   */
+  @Test
+  public void testToyData() throws Exception {
+
+    // a small four vertex graph
+    String[] graph = new String[] {
+        "[1,0,[[2,1],[3,3]]]",
+        "[2,0,[[3,1],[4,10]]]",
+        "[3,0,[[4,2]]]",
+        "[4,0,[]]"
+    };
+
+    // start from vertex 1
+    Map<String, String> params = Maps.newHashMap();
+    params.put(SimpleShortestPathsVertex.SOURCE_ID, "1");
+
+    GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+        classes = new GiraphClasses();
+    classes.setVertexClass(SimpleShortestPathsVertex.class);
+    classes.setVertexInputFormatClass(
+        JsonLongDoubleFloatDoubleVertexInputFormat.class);
+    classes.setVertexOutputFormatClass(
+        JsonLongDoubleFloatDoubleVertexOutputFormat.class);
+
+    // run internally
+    Iterable<String> results = InternalVertexRunner.run(classes, params, graph);
+
+    Map<Long, Double> distances = parseDistances(results);
+
+    // verify results
+    assertNotNull(distances);
+    assertEquals(4, (int) distances.size());
+    assertEquals(0.0, (double) distances.get(1L), 0d);
+    assertEquals(1.0, (double) distances.get(2L), 0d);
+    assertEquals(2.0, (double) distances.get(3L), 0d);
+    assertEquals(4.0, (double) distances.get(4L), 0d);
+  }
+
+  private Map<Long, Double> parseDistances(Iterable<String> results) {
+    Map<Long, Double> distances =
+        Maps.newHashMapWithExpectedSize(Iterables.size(results));
+    for (String line : results) {
+      try {
+        JSONArray jsonVertex = new JSONArray(line);
+        distances.put(jsonVertex.getLong(0), jsonVertex.getDouble(1));
+      } catch (JSONException e) {
+        throw new IllegalArgumentException(
+            "Couldn't get vertex from line " + line, e);
+      }
+    }
+    return distances;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ab64a4d0/giraph-examples/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingVertexTest.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingVertexTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingVertexTest.java
new file mode 100644
index 0000000..7e7b13d
--- /dev/null
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingVertexTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.EdgeNoValue;
+import org.apache.giraph.utils.MockUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Contains a simple unit test for {@link SimpleTriangleClosingVertex}
+ */
+public class SimpleTriangleClosingVertexTest {
+
+  /**
+   * Test the behavior of the triangle closing algorithm:
+   * does it send all its out edge values to all neighbors?
+   */
+  @Test
+  public void testSuperstepZero() throws Exception {
+    // this guy should end up with an array value of 4
+    SimpleTriangleClosingVertex vertex =
+      new SimpleTriangleClosingVertex();
+    SimpleTriangleClosingVertex.IntArrayListWritable alw =
+      new SimpleTriangleClosingVertex.IntArrayListWritable();
+    vertex.initialize(null, null);
+    vertex.addEdge(new EdgeNoValue<IntWritable>(new IntWritable(5)));
+    vertex.addEdge(new EdgeNoValue<IntWritable>(new IntWritable(7)));
+
+    MockUtils.MockedEnvironment<IntWritable,
+      SimpleTriangleClosingVertex.IntArrayListWritable,
+    NullWritable, IntWritable> env =
+      MockUtils.prepareVertex(vertex, 0L,
+        new IntWritable(1), alw, false);
+
+    vertex.compute(Lists.<IntWritable>newArrayList(
+      new IntWritable(83), new IntWritable(42)));
+
+    env.verifyMessageSent(new IntWritable(5), new IntWritable(5));
+    env.verifyMessageSent(new IntWritable(5), new IntWritable(7));
+    env.verifyMessageSent(new IntWritable(7), new IntWritable(5));
+    env.verifyMessageSent(new IntWritable(7), new IntWritable(7));
+  }
+
+  /** Test behavior of compute() with incoming messages (superstep 1) */
+  @Test
+  public void testSuperstepOne() throws Exception {
+    // see if the vertex interprets its incoming
+    // messages properly to verify the algorithm
+    SimpleTriangleClosingVertex vertex =
+      new SimpleTriangleClosingVertex();
+    vertex.initialize(null, null);
+    MockUtils.MockedEnvironment<IntWritable,
+      SimpleTriangleClosingVertex.IntArrayListWritable,
+      NullWritable, IntWritable>
+      env = MockUtils.<IntWritable,
+      SimpleTriangleClosingVertex.IntArrayListWritable,
+      NullWritable, IntWritable> prepareVertex(
+        vertex, 1L, new IntWritable(1), null, false);
+      // superstep 1: can the vertex process these correctly?
+      vertex.compute(Lists.<IntWritable>newArrayList(
+        new IntWritable(7),
+        new IntWritable(3),
+        new IntWritable(4),
+        new IntWritable(7),
+        new IntWritable(4),
+        new IntWritable(2),
+        new IntWritable(4)));
+      final String pairCheck = "[4, 7]";
+      assertEquals(pairCheck, vertex.getValue().toString());
+  }
+ }

http://git-wip-us.apache.org/repos/asf/giraph/blob/ab64a4d0/giraph-examples/src/test/java/org/apache/giraph/examples/TestPageRank.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/TestPageRank.java b/giraph-examples/src/test/java/org/apache/giraph/examples/TestPageRank.java
new file mode 100644
index 0000000..5e61596
--- /dev/null
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/TestPageRank.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.examples;
+
+import org.apache.giraph.BspCase;
+import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.job.GiraphJob;
+import org.apache.giraph.partition.HashMasterPartitioner;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test page rank (with and without multithreading)
+ */
+public class TestPageRank extends BspCase {
+
+  /**
+   * Constructor
+   */
+  public TestPageRank() {
+    super(TestPageRank.class.getName());
+  }
+
+  @Test
+  public void testBspPageRankSingleCompute()
+      throws ClassNotFoundException, IOException, InterruptedException {
+    testPageRank(1);
+  }
+
+
+  @Test
+  public void testPageRankTenThreadsCompute()
+      throws ClassNotFoundException, IOException, InterruptedException {
+    testPageRank(10);
+  }
+
+  /**
+   * Generic page rank test
+   *
+   * @param numComputeThreads Number of compute threads to use
+   * @throws java.io.IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  private void testPageRank(int numComputeThreads)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+        classes = new GiraphClasses();
+    classes.setVertexClass(SimplePageRankVertex.class);
+    classes.setVertexInputFormatClass(
+        SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
+    classes.setWorkerContextClass(
+        SimplePageRankVertex.SimplePageRankVertexWorkerContext.class);
+    classes.setMasterComputeClass(
+        SimplePageRankVertex.SimplePageRankVertexMasterCompute.class);
+    GiraphJob job = prepareJob(getCallingMethodName(), classes);
+    GiraphConfiguration conf = job.getConfiguration();
+    conf.setNumComputeThreads(numComputeThreads);
+    // Set enough partitions to generate randomness on the compute side
+    if (numComputeThreads != 1) {
+      conf.setInt(HashMasterPartitioner.USER_PARTITION_COUNT,
+          numComputeThreads * 5);
+    }
+    assertTrue(job.run(true));
+    if (!runningInDistributedMode()) {
+      double maxPageRank =
+          SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMax();
+      double minPageRank =
+          SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMin();
+      long numVertices =
+          SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalSum();
+      System.out.println(getCallingMethodName() + ": maxPageRank=" +
+          maxPageRank + " minPageRank=" +
+          minPageRank + " numVertices=" + numVertices + ", " +
+          " numComputeThreads=" + numComputeThreads);
+      assertEquals(34.03, maxPageRank, 0.001);
+      assertEquals(0.03, minPageRank, 0.00001);
+      assertEquals(5l, numVertices);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ab64a4d0/giraph-examples/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java
new file mode 100644
index 0000000..2f9704d
--- /dev/null
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.combiner.MinimumIntCombiner;
+import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
+import org.apache.giraph.io.formats.IntIntNullIntTextInputFormat;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.junit.Test;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ *  Tests for {@link TryMultiIpcBindingPortsTest}
+ */
+public class TryMultiIpcBindingPortsTest {
+
+    /**
+     * A local integration test on toy data
+     */
+    @Test
+    public void testToyData() throws Exception {
+
+        // a small graph with three components
+        String[] graph = new String[] {
+                "1 2 3",
+                "2 1 4 5",
+                "3 1 4",
+                "4 2 3 5 13",
+                "5 2 4 12 13",
+                "12 5 13",
+                "13 4 5 12",
+
+                "6 7 8",
+                "7 6 10 11",
+                "8 6 10",
+                "10 7 8 11",
+                "11 7 10",
+
+                "9" };
+
+        // run internally
+        // fail the first port binding attempt
+        Map<String, String> params = Maps.<String, String>newHashMap();
+        params.put(GiraphConstants.FAIL_FIRST_IPC_PORT_BIND_ATTEMPT, "true");
+
+        GiraphClasses classes = new GiraphClasses();
+        classes.setVertexClass(ConnectedComponentsVertex.class);
+        classes.setCombinerClass(MinimumIntCombiner.class);
+        classes.setVertexInputFormatClass(IntIntNullIntTextInputFormat.class);
+        classes.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+
+        Iterable<String> results = InternalVertexRunner.run(classes, params,
+                graph);
+
+        SetMultimap<Integer,Integer> components = parseResults(results);
+
+        Set<Integer> componentIDs = components.keySet();
+        assertEquals(3, componentIDs.size());
+        assertTrue(componentIDs.contains(1));
+        assertTrue(componentIDs.contains(6));
+        assertTrue(componentIDs.contains(9));
+
+        Set<Integer> componentOne = components.get(1);
+        assertEquals(7, componentOne.size());
+        assertTrue(componentOne.contains(1));
+        assertTrue(componentOne.contains(2));
+        assertTrue(componentOne.contains(3));
+        assertTrue(componentOne.contains(4));
+        assertTrue(componentOne.contains(5));
+        assertTrue(componentOne.contains(12));
+        assertTrue(componentOne.contains(13));
+
+        Set<Integer> componentTwo = components.get(6);
+        assertEquals(5, componentTwo.size());
+        assertTrue(componentTwo.contains(6));
+        assertTrue(componentTwo.contains(7));
+        assertTrue(componentTwo.contains(8));
+        assertTrue(componentTwo.contains(10));
+        assertTrue(componentTwo.contains(11));
+
+        Set<Integer> componentThree = components.get(9);
+        assertEquals(1, componentThree.size());
+        assertTrue(componentThree.contains(9));
+    }
+
+    private SetMultimap<Integer,Integer> parseResults(
+            Iterable<String> results) {
+        SetMultimap<Integer,Integer> components = HashMultimap.create();
+        for (String result : results) {
+            Iterable<String> parts = Splitter.on('\t').split(result);
+            int vertex = Integer.parseInt(Iterables.get(parts, 0));
+            int component = Integer.parseInt(Iterables.get(parts, 1));
+            components.put(component, vertex);
+        }
+        return components;
+    }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ab64a4d0/giraph-examples/src/test/java/org/apache/giraph/vertex/TestVertexTypes.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/vertex/TestVertexTypes.java b/giraph-examples/src/test/java/org/apache/giraph/vertex/TestVertexTypes.java
new file mode 100644
index 0000000..80187ef
--- /dev/null
+++ b/giraph-examples/src/test/java/org/apache/giraph/vertex/TestVertexTypes.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.vertex;
+
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
+import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.job.GiraphTypeValidator;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
+import org.apache.giraph.io.formats.JsonBase64VertexInputFormat;
+import org.apache.giraph.io.formats.JsonBase64VertexOutputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Test;
+
+import java.io.IOException;
+
+
+public class TestVertexTypes {
+
+    /**
+     * Matches the {@link GeneratedVertexInputFormat}
+     */
+    private static class GeneratedVertexMatch extends
+            EdgeListVertex<LongWritable, IntWritable, FloatWritable,
+            FloatWritable> {
+        @Override
+        public void compute(Iterable<FloatWritable> messages)
+            throws IOException {
+        }
+    }
+
+    /**
+     * Matches the {@link GeneratedVertexInputFormat}
+     */
+    private static class DerivedVertexMatch extends GeneratedVertexMatch {
+    }
+
+    /**
+     * Mismatches the {@link GeneratedVertexInputFormat}
+     */
+    private static class GeneratedVertexMismatch extends
+            EdgeListVertex<LongWritable, FloatWritable, FloatWritable,
+            FloatWritable> {
+        @Override
+        public void compute(Iterable<FloatWritable> messages)
+                throws IOException {
+        }
+    }
+
+    /**
+     * Matches the {@link GeneratedVertexMatch}
+     */
+    private static class GeneratedVertexMatchCombiner extends
+        Combiner<LongWritable, FloatWritable> {
+      @Override
+      public void combine(LongWritable vertexIndex,
+          FloatWritable originalMessage,
+          FloatWritable messageToCombine) {
+      }
+
+      @Override
+      public FloatWritable createInitialMessage() {
+        return null;
+      }
+    }
+
+    /**
+     * Mismatches the {@link GeneratedVertexMatch}
+     */
+    private static class GeneratedVertexMismatchCombiner extends
+        Combiner<LongWritable, DoubleWritable> {
+      @Override
+      public void combine(LongWritable vertexIndex,
+          DoubleWritable originalMessage,
+          DoubleWritable messageToCombine) {
+      }
+
+      @Override
+      public DoubleWritable createInitialMessage() {
+        return null;
+      }
+    }
+
+    @Test
+    public void testMatchingType() throws SecurityException,
+            NoSuchMethodException, NoSuchFieldException {
+        Configuration conf = new Configuration();
+        conf.setClass(GiraphConstants.VERTEX_CLASS,
+                      GeneratedVertexMatch.class,
+                      Vertex.class);
+        conf.setClass(GiraphConstants.VERTEX_INPUT_FORMAT_CLASS,
+                      SimpleSuperstepVertexInputFormat.class,
+                      VertexInputFormat.class);
+        conf.setClass(GiraphConstants.VERTEX_COMBINER_CLASS,
+                      GeneratedVertexMatchCombiner.class,
+                      Combiner.class);
+      @SuppressWarnings("rawtypes")
+      GiraphTypeValidator<?, ?, ?, ?> validator =
+        new GiraphTypeValidator(conf);
+      validator.validateClassTypes();
+    }
+
+    @Test
+    public void testDerivedMatchingType() throws SecurityException,
+            NoSuchMethodException, NoSuchFieldException {
+        Configuration conf = new Configuration();
+        conf.setClass(GiraphConstants.VERTEX_CLASS,
+                      DerivedVertexMatch.class,
+                      Vertex.class);
+        conf.setClass(GiraphConstants.VERTEX_INPUT_FORMAT_CLASS,
+                      SimpleSuperstepVertexInputFormat.class,
+                      VertexInputFormat.class);
+        @SuppressWarnings("rawtypes")
+        GiraphTypeValidator<?, ?, ?, ?> validator =
+          new GiraphTypeValidator(conf);
+        validator.validateClassTypes();
+    }
+
+    @Test
+    public void testDerivedInputFormatType() throws SecurityException,
+            NoSuchMethodException, NoSuchFieldException {
+        Configuration conf = new Configuration();
+        conf.setClass(GiraphConstants.VERTEX_CLASS,
+                      DerivedVertexMatch.class,
+                      Vertex.class);
+        conf.setClass(GiraphConstants.VERTEX_INPUT_FORMAT_CLASS,
+                      SimpleSuperstepVertexInputFormat.class,
+                      VertexInputFormat.class);
+      @SuppressWarnings("rawtypes")
+      GiraphTypeValidator<?, ?, ?, ?> validator =
+        new GiraphTypeValidator(conf);
+      validator.validateClassTypes();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testMismatchingVertex() throws SecurityException,
+      NoSuchMethodException, NoSuchFieldException {
+      Configuration conf = new Configuration();
+      conf.setClass(GiraphConstants.VERTEX_CLASS,
+        GeneratedVertexMismatch.class,
+        Vertex.class);
+        conf.setClass(GiraphConstants.VERTEX_INPUT_FORMAT_CLASS,
+          SimpleSuperstepVertexInputFormat.class,
+          VertexInputFormat.class);
+        @SuppressWarnings("rawtypes")
+        GiraphTypeValidator<?, ?, ?, ?> validator =
+          new GiraphTypeValidator(conf);
+        validator.validateClassTypes();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testMismatchingCombiner() throws SecurityException,
+      NoSuchMethodException, NoSuchFieldException {
+      Configuration conf = new Configuration();
+      conf.setClass(GiraphConstants.VERTEX_CLASS,
+        GeneratedVertexMatch.class, Vertex.class);
+      conf.setClass(GiraphConstants.VERTEX_INPUT_FORMAT_CLASS,
+        SimpleSuperstepVertexInputFormat.class,
+        VertexInputFormat.class);
+      conf.setClass(GiraphConstants.VERTEX_COMBINER_CLASS,
+        GeneratedVertexMismatchCombiner.class,
+        Combiner.class);
+      @SuppressWarnings("rawtypes")
+      GiraphTypeValidator<?, ?, ?, ?> validator =
+        new GiraphTypeValidator(conf);
+      validator.validateClassTypes();
+    }
+
+    @Test
+    public void testJsonBase64FormatType() throws SecurityException,
+            NoSuchMethodException, NoSuchFieldException {
+        Configuration conf = new Configuration();
+        conf.setClass(GiraphConstants.VERTEX_CLASS,
+                      GeneratedVertexMatch.class,
+                      Vertex.class);
+        conf.setClass(GiraphConstants.VERTEX_INPUT_FORMAT_CLASS,
+                      JsonBase64VertexInputFormat.class,
+                      VertexInputFormat.class);
+        conf.setClass(GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS,
+                      JsonBase64VertexOutputFormat.class,
+                      VertexOutputFormat.class);
+        @SuppressWarnings("rawtypes")
+        GiraphTypeValidator<?, ?, ?, ?> validator =
+          new GiraphTypeValidator(conf);
+        validator.validateClassTypes();
+    }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ab64a4d0/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4721ced..f6e9302 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1025,6 +1025,7 @@ under the License.
 
   <modules>
     <module>giraph-core</module>
+    <module>giraph-examples</module>
   </modules>
 
 </project>


Mime
View raw message