giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject [3/23] GIRAPH-409: Refactor / cleanups (nitay)
Date Fri, 04 Jan 2013 20:52:39 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/TestVertexTypes.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestVertexTypes.java b/giraph-core/src/test/java/org/apache/giraph/TestVertexTypes.java
deleted file mode 100644
index e1f1001..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/TestVertexTypes.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph;
-
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
-import org.apache.giraph.graph.Combiner;
-import org.apache.giraph.graph.EdgeListVertex;
-import org.apache.giraph.graph.GiraphTypeValidator;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexInputFormat;
-import org.apache.giraph.graph.VertexOutputFormat;
-import org.apache.giraph.io.GeneratedVertexInputFormat;
-import org.apache.giraph.io.JsonBase64VertexInputFormat;
-import org.apache.giraph.io.JsonBase64VertexOutputFormat;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.junit.Test;
-
-import java.io.IOException;
-
-
-public class TestVertexTypes {
-
-    /**
-     * Matches the {@link GeneratedVertexInputFormat}
-     */
-    private static class GeneratedVertexMatch extends
-            EdgeListVertex<LongWritable, IntWritable, FloatWritable,
-            FloatWritable> {
-        @Override
-        public void compute(Iterable<FloatWritable> messages)
-            throws IOException {
-        }
-    }
-
-    /**
-     * Matches the {@link GeneratedVertexInputFormat}
-     */
-    private static class DerivedVertexMatch extends GeneratedVertexMatch {
-    }
-
-    /**
-     * Mismatches the {@link GeneratedVertexInputFormat}
-     */
-    private static class GeneratedVertexMismatch extends
-            EdgeListVertex<LongWritable, FloatWritable, FloatWritable,
-            FloatWritable> {
-        @Override
-        public void compute(Iterable<FloatWritable> messages)
-                throws IOException {
-        }
-    }
-
-    /**
-     * Matches the {@link GeneratedVertexMatch}
-     */
-    private static class GeneratedVertexMatchCombiner extends
-        Combiner<LongWritable, FloatWritable> {
-      @Override
-      public void combine(LongWritable vertexIndex,
-          FloatWritable originalMessage,
-          FloatWritable messageToCombine) {
-      }
-
-      @Override
-      public FloatWritable createInitialMessage() {
-        return null;
-      }
-    }
-
-    /**
-     * Mismatches the {@link GeneratedVertexMatch}
-     */
-    private static class GeneratedVertexMismatchCombiner extends
-        Combiner<LongWritable, DoubleWritable> {
-      @Override
-      public void combine(LongWritable vertexIndex,
-          DoubleWritable originalMessage,
-          DoubleWritable messageToCombine) {
-      }
-
-      @Override
-      public DoubleWritable createInitialMessage() {
-        return null;
-      }
-    }
-
-    @Test
-    public void testMatchingType() throws SecurityException,
-            NoSuchMethodException, NoSuchFieldException {
-        Configuration conf = new Configuration();
-        conf.setClass(GiraphConstants.VERTEX_CLASS,
-                      GeneratedVertexMatch.class,
-                      Vertex.class);
-        conf.setClass(GiraphConstants.VERTEX_INPUT_FORMAT_CLASS,
-                      SimpleSuperstepVertexInputFormat.class,
-                      VertexInputFormat.class);
-        conf.setClass(GiraphConstants.VERTEX_COMBINER_CLASS,
-                      GeneratedVertexMatchCombiner.class,
-                      Combiner.class);
-      @SuppressWarnings("rawtypes")
-      GiraphTypeValidator<?, ?, ?, ?> validator =
-        new GiraphTypeValidator(conf);
-      validator.validateClassTypes();
-    }
-
-    @Test
-    public void testDerivedMatchingType() throws SecurityException,
-            NoSuchMethodException, NoSuchFieldException {
-        Configuration conf = new Configuration();
-        conf.setClass(GiraphConstants.VERTEX_CLASS,
-                      DerivedVertexMatch.class,
-                      Vertex.class);
-        conf.setClass(GiraphConstants.VERTEX_INPUT_FORMAT_CLASS,
-                      SimpleSuperstepVertexInputFormat.class,
-                      VertexInputFormat.class);
-        @SuppressWarnings("rawtypes")
-        GiraphTypeValidator<?, ?, ?, ?> validator =
-          new GiraphTypeValidator(conf);
-        validator.validateClassTypes();
-    }
-
-    @Test
-    public void testDerivedInputFormatType() throws SecurityException,
-            NoSuchMethodException, NoSuchFieldException {
-        Configuration conf = new Configuration();
-        conf.setClass(GiraphConstants.VERTEX_CLASS,
-                      DerivedVertexMatch.class,
-                      Vertex.class);
-        conf.setClass(GiraphConstants.VERTEX_INPUT_FORMAT_CLASS,
-                      SimpleSuperstepVertexInputFormat.class,
-                      VertexInputFormat.class);
-      @SuppressWarnings("rawtypes")
-      GiraphTypeValidator<?, ?, ?, ?> validator =
-        new GiraphTypeValidator(conf);
-      validator.validateClassTypes();
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testMismatchingVertex() throws SecurityException,
-      NoSuchMethodException, NoSuchFieldException {
-      Configuration conf = new Configuration();
-      conf.setClass(GiraphConstants.VERTEX_CLASS,
-        GeneratedVertexMismatch.class,
-        Vertex.class);
-        conf.setClass(GiraphConstants.VERTEX_INPUT_FORMAT_CLASS,
-          SimpleSuperstepVertexInputFormat.class,
-          VertexInputFormat.class);
-        @SuppressWarnings("rawtypes")
-        GiraphTypeValidator<?, ?, ?, ?> validator =
-          new GiraphTypeValidator(conf);
-        validator.validateClassTypes();
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testMismatchingCombiner() throws SecurityException,
-      NoSuchMethodException, NoSuchFieldException {
-      Configuration conf = new Configuration();
-      conf.setClass(GiraphConstants.VERTEX_CLASS,
-        GeneratedVertexMatch.class, Vertex.class);
-      conf.setClass(GiraphConstants.VERTEX_INPUT_FORMAT_CLASS,
-        SimpleSuperstepVertexInputFormat.class,
-        VertexInputFormat.class);
-      conf.setClass(GiraphConstants.VERTEX_COMBINER_CLASS,
-        GeneratedVertexMismatchCombiner.class,
-        Combiner.class);
-      @SuppressWarnings("rawtypes")
-      GiraphTypeValidator<?, ?, ?, ?> validator =
-        new GiraphTypeValidator(conf);
-      validator.validateClassTypes();
-    }
-
-    @Test
-    public void testJsonBase64FormatType() throws SecurityException,
-            NoSuchMethodException, NoSuchFieldException {
-        Configuration conf = new Configuration();
-        conf.setClass(GiraphConstants.VERTEX_CLASS,
-                      GeneratedVertexMatch.class,
-                      Vertex.class);
-        conf.setClass(GiraphConstants.VERTEX_INPUT_FORMAT_CLASS,
-                      JsonBase64VertexInputFormat.class,
-                      VertexInputFormat.class);
-        conf.setClass(GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS,
-                      JsonBase64VertexOutputFormat.class,
-                      VertexOutputFormat.class);
-        @SuppressWarnings("rawtypes")
-        GiraphTypeValidator<?, ?, ?, ?> validator =
-          new GiraphTypeValidator(conf);
-        validator.validateClassTypes();
-    }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/TestZooKeeperExt.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestZooKeeperExt.java b/giraph-core/src/test/java/org/apache/giraph/TestZooKeeperExt.java
deleted file mode 100644
index 56e3dd0..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/TestZooKeeperExt.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.List;
-
-import org.apache.giraph.zk.ZooKeeperExt;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Test the ZooKeeperExt class.
- */
-public class TestZooKeeperExt implements Watcher {
-  /** ZooKeeperExt instance */
-  private ZooKeeperExt zooKeeperExt = null;
-  /** ZooKeeper server list */
-  private String zkList = System.getProperty("prop.zookeeper.list");
-
-  public static final String BASE_PATH = "/_zooKeeperExtTest";
-  public static final String FIRST_PATH = "/_first";
-
-  public void process(WatchedEvent event) {
-    return;
-  }
-
-  @Before
-  public void setUp() {
-    try {
-      if (zkList == null) {
-        return;
-      }
-      zooKeeperExt =
-          new ZooKeeperExt(zkList, 30 * 1000, 0, 0, this);
-      zooKeeperExt.deleteExt(BASE_PATH, -1, true);
-    } catch (KeeperException.NoNodeException e) {
-      System.out.println("Clean start: No node " + BASE_PATH);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @After
-  public void tearDown() {
-    if (zooKeeperExt == null) {
-      return;
-    }
-    try {
-      zooKeeperExt.close();
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Test
-  public void testCreateExt() throws KeeperException, InterruptedException {
-    if (zooKeeperExt == null) {
-      System.out.println(
-          "testCreateExt: No prop.zookeeper.list set, skipping test");
-      return;
-    }
-    System.out.println("Created: " +
-                           zooKeeperExt.createExt(
-                               BASE_PATH + FIRST_PATH,
-                               null,
-                               Ids.OPEN_ACL_UNSAFE,
-                               CreateMode.PERSISTENT,
-                               true));
-    zooKeeperExt.deleteExt(BASE_PATH + FIRST_PATH, -1, false);
-    zooKeeperExt.deleteExt(BASE_PATH, -1, false);
-  }
-
-  @Test
-  public void testDeleteExt() throws KeeperException, InterruptedException {
-    if (zooKeeperExt == null) {
-      System.out.println(
-          "testDeleteExt: No prop.zookeeper.list set, skipping test");
-      return;
-    }
-    zooKeeperExt.createExt(BASE_PATH,
-                           null,
-                           Ids.OPEN_ACL_UNSAFE,
-                           CreateMode.PERSISTENT,
-                           false);
-    zooKeeperExt.createExt(BASE_PATH + FIRST_PATH,
-                           null,
-                           Ids.OPEN_ACL_UNSAFE,
-                           CreateMode.PERSISTENT,
-                           false);
-    try {
-      zooKeeperExt.deleteExt(BASE_PATH, -1, false);
-    } catch (KeeperException.NotEmptyException e) {
-      System.out.println(
-          "Correctly failed to delete since not recursive");
-    }
-    zooKeeperExt.deleteExt(BASE_PATH, -1, true);
-  }
-
-  @Test
-  public void testGetChildrenExt()
-      throws KeeperException, InterruptedException {
-    if (zooKeeperExt == null) {
-      System.out.println(
-          "testGetChildrenExt: No prop.zookeeper.list set, skipping test");
-      return;
-    }
-    zooKeeperExt.createExt(BASE_PATH,
-                           null,
-                           Ids.OPEN_ACL_UNSAFE,
-                           CreateMode.PERSISTENT,
-                           false);
-    zooKeeperExt.createExt(BASE_PATH + "/b",
-                           null,
-                           Ids.OPEN_ACL_UNSAFE,
-                           CreateMode.PERSISTENT_SEQUENTIAL,
-                           false);
-    zooKeeperExt.createExt(BASE_PATH + "/a",
-                           null,
-                           Ids.OPEN_ACL_UNSAFE,
-                           CreateMode.PERSISTENT_SEQUENTIAL,
-                           false);
-    zooKeeperExt.createExt(BASE_PATH + "/d",
-                           null,
-                           Ids.OPEN_ACL_UNSAFE,
-                           CreateMode.PERSISTENT_SEQUENTIAL,
-                           false);
-    zooKeeperExt.createExt(BASE_PATH + "/c",
-                           null,
-                           Ids.OPEN_ACL_UNSAFE,
-                           CreateMode.PERSISTENT_SEQUENTIAL,
-                           false);
-    List<String> fullPathList =
-        zooKeeperExt.getChildrenExt(BASE_PATH, false, false, true);
-    for (String fullPath : fullPathList) {
-      assertTrue(fullPath.contains(BASE_PATH + "/"));
-    }
-    List<String> sequenceOrderedList =
-        zooKeeperExt.getChildrenExt(BASE_PATH, false, true, true);
-    for (String fullPath : sequenceOrderedList) {
-      assertTrue(fullPath.contains(BASE_PATH + "/"));
-    }
-    assertEquals(4, sequenceOrderedList.size());
-    assertTrue(sequenceOrderedList.get(0).contains("/b"));
-    assertTrue(sequenceOrderedList.get(1).contains("/a"));
-    assertTrue(sequenceOrderedList.get(2).contains("/d"));
-    assertTrue(sequenceOrderedList.get(3).contains("/c"));
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java b/giraph-core/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
new file mode 100644
index 0000000..40db41f
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
@@ -0,0 +1,197 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.giraph.aggregators;
+
+import org.apache.giraph.BspCase;
+import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.comm.aggregators.AggregatorUtils;
+import org.apache.giraph.examples.AggregatorsTestVertex;
+import org.apache.giraph.examples.SimpleCheckpointVertex;
+import org.apache.giraph.examples.SimplePageRankVertex;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.master.MasterAggregatorHandler;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.Progressable;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Map;
+
+/** Tests if aggregators are handled on a proper way */
+public class TestAggregatorsHandling extends BspCase {
+
+  public TestAggregatorsHandling() {
+    super(TestAggregatorsHandling.class.getName());
+  }
+
+  private Map<String, AggregatorWrapper<Writable>> getAggregatorMap
+      (MasterAggregatorHandler aggregatorHandler) {
+    try {
+      Field aggregtorMapField = aggregatorHandler.getClass().getDeclaredField
+          ("aggregatorMap");
+      aggregtorMapField.setAccessible(true);
+      return (Map<String, AggregatorWrapper<Writable>>)
+          aggregtorMapField.get(aggregatorHandler);
+    } catch (IllegalAccessException e) {
+      throw new IllegalStateException(e);
+    } catch (NoSuchFieldException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  /** Tests if aggregators are handled on a proper way during supersteps */
+  @Test
+  public void testAggregatorsHandling() throws IOException,
+      ClassNotFoundException, InterruptedException {
+    GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+        classes = new GiraphClasses();
+    classes.setVertexClass(AggregatorsTestVertex.class);
+    classes.setVertexInputFormatClass(
+        SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
+    GiraphJob job = prepareJob(getCallingMethodName(), classes);
+    job.getConfiguration().setMasterComputeClass(
+        AggregatorsTestVertex.AggregatorsTestMasterCompute.class);
+    // test with aggregators split in a few requests
+    job.getConfiguration().setInt(
+        AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST, 50);
+    assertTrue(job.run(true));
+  }
+
+  /** Test if aggregators serialization captures everything */
+  @Test
+  public void testMasterAggregatorsSerialization() throws
+      IllegalAccessException, InstantiationException, IOException {
+    ImmutableClassesGiraphConfiguration conf =
+        Mockito.mock(ImmutableClassesGiraphConfiguration.class);
+    Mockito.when(conf.getAggregatorWriterClass()).thenReturn(
+        TextAggregatorWriter.class);
+    Progressable progressable = Mockito.mock(Progressable.class);
+    MasterAggregatorHandler handler =
+        new MasterAggregatorHandler(conf, progressable);
+
+    String regularAggName = "regular";
+    LongWritable regularValue = new LongWritable(5);
+    handler.registerAggregator(regularAggName, LongSumAggregator.class);
+    handler.setAggregatedValue(regularAggName, regularValue);
+
+    String persistentAggName = "persistent";
+    DoubleWritable persistentValue = new DoubleWritable(10.5);
+    handler.registerPersistentAggregator(persistentAggName,
+        DoubleOverwriteAggregator.class);
+    handler.setAggregatedValue(persistentAggName, persistentValue);
+
+    for (AggregatorWrapper<Writable> aggregator :
+        getAggregatorMap(handler).values()) {
+      aggregator.setPreviousAggregatedValue(
+          aggregator.getCurrentAggregatedValue());
+    }
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    handler.write(new DataOutputStream(out));
+
+    MasterAggregatorHandler restartedHandler =
+        new MasterAggregatorHandler(conf, progressable);
+    restartedHandler.readFields(
+        new DataInputStream(new ByteArrayInputStream(out.toByteArray())));
+
+    assertEquals(2, getAggregatorMap(restartedHandler).size());
+
+    AggregatorWrapper<Writable> regularAgg =
+        getAggregatorMap(restartedHandler).get(regularAggName);
+    assertTrue(
+        regularAgg.getAggregatorClass().equals(LongSumAggregator.class));
+    assertEquals(regularValue, regularAgg.getPreviousAggregatedValue());
+    assertEquals(regularValue,
+        restartedHandler.<LongWritable>getAggregatedValue(regularAggName));
+    assertFalse(regularAgg.isPersistent());
+
+    AggregatorWrapper<Writable> persistentAgg =
+        getAggregatorMap(restartedHandler).get(persistentAggName);
+    assertTrue(persistentAgg.getAggregatorClass().equals
+        (DoubleOverwriteAggregator.class));
+    assertEquals(persistentValue, persistentAgg.getPreviousAggregatedValue());
+    assertEquals(persistentValue,
+        restartedHandler.<LongWritable>getAggregatedValue(persistentAggName));
+    assertTrue(persistentAgg.isPersistent());
+  }
+
+  /**
+   * Test if aggregators are are handled properly when restarting from a
+   * checkpoint
+   */
+  @Test
+  public void testAggregatorsCheckpointing() throws ClassNotFoundException,
+      IOException, InterruptedException {
+    Path checkpointsDir = getTempPath("checkPointsForTesting");
+    Path outputPath = getTempPath(getCallingMethodName());
+    GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+        classes = new GiraphClasses();
+    classes.setVertexClass(AggregatorsTestVertex.class);
+    classes.setMasterComputeClass(
+        AggregatorsTestVertex.AggregatorsTestMasterCompute.class);
+    classes.setVertexInputFormatClass(
+        SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
+    GiraphJob job = prepareJob(getCallingMethodName(), classes, outputPath);
+
+    job.getConfiguration().set(GiraphConstants.CHECKPOINT_DIRECTORY,
+        checkpointsDir.toString());
+    job.getConfiguration().setBoolean(
+        GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS, false);
+    job.getConfiguration().setCheckpointFrequency(4);
+
+    assertTrue(job.run(true));
+
+    // Restart the test from superstep 4
+    System.out.println("testAggregatorsCheckpointing: Restarting from " +
+        "superstep 4 with checkpoint path = " + checkpointsDir);
+    outputPath = getTempPath(getCallingMethodName() + "Restarted");
+    classes = new GiraphClasses();
+    classes.setVertexClass(AggregatorsTestVertex.class);
+    classes.setMasterComputeClass(
+        AggregatorsTestVertex.AggregatorsTestMasterCompute.class);
+    classes.setVertexInputFormatClass(
+        SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
+    GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted",
+        classes, outputPath);
+    job.getConfiguration().setMasterComputeClass(
+        SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
+    restartedJob.getConfiguration().set(
+        GiraphConstants.CHECKPOINT_DIRECTORY, checkpointsDir.toString());
+    restartedJob.getConfiguration().setLong(
+        GiraphConstants.RESTART_SUPERSTEP, 4);
+
+    assertTrue(restartedJob.run(true));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/bsp/BspUtilsTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/bsp/BspUtilsTest.java b/giraph-core/src/test/java/org/apache/giraph/bsp/BspUtilsTest.java
new file mode 100644
index 0000000..5c77d12
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/bsp/BspUtilsTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.bsp;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.time.SystemTime;
+import org.apache.giraph.time.Time;
+import org.apache.giraph.time.Times;
+import org.apache.giraph.vertex.EdgeListVertex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Benchmark tests to insure that object creation is fast
+ */
+public class BspUtilsTest {
+  @Rule
+  public TestName name = new TestName();
+  private static final Time TIME = SystemTime.get();
+  private static final long COUNT = 200000;
+  private Configuration conf = new Configuration();
+  private long startNanos = -1;
+  private long totalNanos = -1;
+  private long total = 0;
+  private long expected = COUNT * (COUNT - 1) / 2L;
+  private ImmutableClassesGiraphConfiguration<LongWritable, LongWritable,
+      LongWritable, LongWritable> configuration;
+
+  @Before
+  public void setUp() {
+    conf.setClass(GiraphConstants.VERTEX_ID_CLASS, IntWritable.class,
+        WritableComparable.class);
+    conf.setClass(GiraphConstants.VERTEX_VALUE_CLASS, LongWritable.class,
+        Writable.class);
+    conf.setClass(GiraphConstants.EDGE_VALUE_CLASS, DoubleWritable.class,
+        Writable.class);
+    conf.setClass(GiraphConstants.MESSAGE_VALUE_CLASS, LongWritable.class,
+        Writable.class);
+    GiraphConfiguration conf = new GiraphConfiguration();
+    conf.setVertexClass(ImmutableVertex.class);
+    configuration =
+        new ImmutableClassesGiraphConfiguration<LongWritable, LongWritable,
+            LongWritable, LongWritable>(conf);
+    total = 0;
+    System.gc();
+  }
+
+  @After
+  public void cleanUp() {
+    totalNanos = Times.getNanosSince(TIME, startNanos);
+    System.out.println(name.getMethodName() + ": took "
+        + totalNanos +
+        " ns for " + COUNT + " elements " + (totalNanos * 1f / COUNT) +
+        " ns / element");
+    assertEquals(expected, total);
+    System.gc();
+  }
+
+  @Test
+  public void testCreateClass() {
+    startNanos = TIME.getNanoseconds();
+    for (int i = 0; i < COUNT; ++i) {
+      LongWritable value = BspUtils.createVertexValue(conf);
+      value.set(i);
+      total += value.get();
+    }
+  }
+
+  @Test
+  public void testNativeCreateClass() {
+    startNanos = TIME.getNanoseconds();
+    for (int i = 0; i < COUNT; ++i) {
+      LongWritable value = new LongWritable();
+      value.set(i);
+      total += value.get();
+    }
+  }
+
+  private Class<?> getLongWritableClass() {
+    return LongWritable.class;
+  }
+
+  @Test
+  public void testNewInstance()
+      throws IllegalAccessException, InstantiationException {
+    startNanos = TIME.getNanoseconds();
+    for (int i = 0; i < COUNT; ++i) {
+      LongWritable value = (LongWritable)
+          getLongWritableClass().newInstance();
+      value.set(i);
+      total += value.get();
+    }
+  }
+
+  private synchronized Class<?> getSyncLongWritableClass() {
+    return LongWritable.class;
+  }
+
+  @Test
+  public void testSyncNewInstance()
+      throws IllegalAccessException, InstantiationException {
+    startNanos = TIME.getNanoseconds();
+    for (int i = 0; i < COUNT; ++i) {
+      LongWritable value = (LongWritable)
+          getSyncLongWritableClass().newInstance();
+      value.set(i);
+      total += value.get();
+    }
+  }
+
+  @Test
+  public void testReflectionUtilsNewInstance()
+      throws IllegalAccessException, InstantiationException {
+    // Throwaway to put into cache
+    org.apache.hadoop.util.ReflectionUtils.newInstance(LongWritable.class,
+        null);
+    startNanos = TIME.getNanoseconds();
+    for (int i = 0; i < COUNT; ++i) {
+      LongWritable value = (LongWritable)
+          org.apache.hadoop.util.ReflectionUtils.newInstance(
+              getLongWritableClass(), null);
+      value.set(i);
+      total += value.get();
+    }
+  }
+
+  @Test
+  public void testConstructorNewInstance()
+      throws IllegalAccessException, InstantiationException,
+      NoSuchMethodException, InvocationTargetException {
+    Constructor<?> constructor = LongWritable.class.getDeclaredConstructor
+        (new Class[]{});
+    startNanos = TIME.getNanoseconds();
+    for (int i = 0; i < COUNT; ++i) {
+      LongWritable value = (LongWritable) constructor.newInstance();
+      value.set(i);
+      total += value.get();
+    }
+  }
+
+  private static class ImmutableVertex extends EdgeListVertex<LongWritable,
+      LongWritable, LongWritable, LongWritable> {
+    @Override
+    public void compute(Iterable<LongWritable> messages) throws IOException {
+    }
+  }
+
+  private ImmutableClassesGiraphConfiguration<LongWritable, LongWritable,
+      LongWritable, LongWritable> getConfiguration() {
+    return configuration;
+  }
+
+  @Test
+  public void testImmutableClassesGiraphConfigurationNewInstance() {
+    startNanos = TIME.getNanoseconds();
+    for (int i = 0; i < COUNT; ++i) {
+      LongWritable value = getConfiguration().createVertexValue();
+      value.set(i);
+      total += value.get();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java
index 14e590c..381ca7c 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java
@@ -27,8 +27,8 @@ import org.apache.giraph.comm.netty.handler.RequestServerHandler;
 import org.apache.giraph.comm.netty.NettyClient;
 import org.apache.giraph.comm.netty.NettyServer;
 import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
-import org.apache.giraph.graph.EdgeListVertex;
-import org.apache.giraph.graph.WorkerInfo;
+import org.apache.giraph.vertex.EdgeListVertex;
+import org.apache.giraph.worker.WorkerInfo;
 import org.apache.giraph.utils.MockUtils;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapreduce.Mapper.Context;

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
index 4b41f63..2845c90 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
@@ -26,8 +26,8 @@ import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.EdgeListVertex;
-import org.apache.giraph.graph.WorkerInfo;
+import org.apache.giraph.vertex.EdgeListVertex;
+import org.apache.giraph.worker.WorkerInfo;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.MockUtils;
 import org.apache.giraph.utils.PairList;

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
index d3bf7c3..3c0cd85 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
@@ -28,12 +28,12 @@ import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeListVertex;
-import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.vertex.EdgeListVertex;
+import org.apache.giraph.vertex.Vertex;
 import org.apache.giraph.graph.VertexMutations;
-import org.apache.giraph.graph.WorkerInfo;
-import org.apache.giraph.graph.partition.Partition;
-import org.apache.giraph.graph.partition.PartitionStore;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.giraph.partition.Partition;
+import org.apache.giraph.partition.PartitionStore;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.MockUtils;
 import org.apache.giraph.utils.PairList;

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
index 0f9c64e..aa89b85 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
@@ -25,8 +25,8 @@ import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.EdgeListVertex;
-import org.apache.giraph.graph.WorkerInfo;
+import org.apache.giraph.vertex.EdgeListVertex;
+import org.apache.giraph.worker.WorkerInfo;
 import org.apache.giraph.utils.MockUtils;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapreduce.Mapper.Context;

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
index c6da853..5ce7a89 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
@@ -47,7 +47,7 @@ import org.apache.giraph.comm.messages.FlushableMessageStore;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
 import org.apache.giraph.comm.messages.SequentialFileMessageStore;
-import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.vertex.EdgeListVertex;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.CollectionUtils;
 import org.apache.giraph.utils.MockUtils;

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/conf/TestGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/conf/TestGiraphConfiguration.java b/giraph-core/src/test/java/org/apache/giraph/conf/TestGiraphConfiguration.java
new file mode 100644
index 0000000..cd37197
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/conf/TestGiraphConfiguration.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.conf;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class TestGiraphConfiguration {
+  public interface If { }
+  public class A implements If { }
+  public class B implements If { }
+  public class C implements If { }
+
+  @Test
+  public void testSetClasses() {
+    GiraphConfiguration conf = new GiraphConfiguration();
+    conf.setClasses("foo", If.class, A.class, B.class);
+    Class<?>[] klasses = conf.getClasses("foo");
+    assertEquals(2, klasses.length);
+    assertEquals(A.class, klasses[0]);
+    assertEquals(B.class, klasses[1]);
+
+    try {
+      conf.setClasses("foo", A.class, B.class);
+      fail();
+    } catch (RuntimeException e) {
+      assertEquals(2, conf.getClasses("foo").length);
+    }
+
+    Class<? extends If>[] klasses2 = conf.getClassesOfType("foo", If.class);
+    assertEquals(2, klasses2.length);
+    assertEquals(A.class, klasses2[0]);
+    assertEquals(B.class, klasses2[1]);
+  }
+
+  @Test
+  public void testAddToClasses() {
+    GiraphConfiguration conf = new GiraphConfiguration();
+
+    conf.setClasses("foo", If.class, A.class, B.class);
+    conf.addToClasses("foo", C.class, If.class);
+    Class<?>[] klasses = conf.getClasses("foo");
+    assertEquals(3, klasses.length);
+    assertEquals(A.class, klasses[0]);
+    assertEquals(B.class, klasses[1]);
+    assertEquals(C.class, klasses[2]);
+
+    conf.addToClasses("bar", B.class, If.class);
+    klasses = conf.getClasses("bar");
+    assertEquals(1, klasses.length);
+    assertEquals(B.class, klasses[0]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTest.java b/giraph-core/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTest.java
index fa5c693..49a2f33 100644
--- a/giraph-core/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTest.java
@@ -18,9 +18,10 @@
 
 package org.apache.giraph.examples;
 
+import org.apache.giraph.combiner.MinimumIntCombiner;
 import org.apache.giraph.conf.GiraphClasses;
-import org.apache.giraph.io.IdWithValueTextOutputFormat;
-import org.apache.giraph.io.IntIntNullIntTextInputFormat;
+import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
+import org.apache.giraph.io.formats.IntIntNullIntTextInputFormat;
 import org.apache.giraph.utils.InternalVertexRunner;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java b/giraph-core/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
index 99d0b38..434c756 100644
--- a/giraph-core/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
@@ -20,7 +20,8 @@ package org.apache.giraph.examples;
 
 import static org.junit.Assert.assertEquals;
 
-import org.apache.giraph.graph.Combiner;
+import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MinimumIntCombiner;
 import org.apache.hadoop.io.IntWritable;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/examples/SimpleShortestPathsVertexTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/examples/SimpleShortestPathsVertexTest.java b/giraph-core/src/test/java/org/apache/giraph/examples/SimpleShortestPathsVertexTest.java
index a90dae7..8037e9e 100644
--- a/giraph-core/src/test/java/org/apache/giraph/examples/SimpleShortestPathsVertexTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/examples/SimpleShortestPathsVertexTest.java
@@ -23,8 +23,8 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.giraph.conf.GiraphClasses;
 import org.apache.giraph.graph.Edge;
-import org.apache.giraph.io.JsonLongDoubleFloatDoubleVertexInputFormat;
-import org.apache.giraph.io.JsonLongDoubleFloatDoubleVertexOutputFormat;
+import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat;
+import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexOutputFormat;
 import org.apache.giraph.utils.InternalVertexRunner;
 import org.apache.giraph.utils.MockUtils;
 import org.apache.hadoop.io.DoubleWritable;

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/examples/TestPageRank.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/examples/TestPageRank.java b/giraph-core/src/test/java/org/apache/giraph/examples/TestPageRank.java
new file mode 100644
index 0000000..3909f46
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/examples/TestPageRank.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.examples;
+
+import org.apache.giraph.BspCase;
+import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.partition.HashMasterPartitioner;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test page rank (with and without multithreading)
+ */
+public class TestPageRank extends BspCase {
+
+  /**
+   * Constructor
+   */
+  public TestPageRank() {
+    super(TestPageRank.class.getName());
+  }
+
+  @Test
+  public void testBspPageRankSingleCompute()
+      throws ClassNotFoundException, IOException, InterruptedException {
+    testPageRank(1);
+  }
+
+
+  @Test
+  public void testPageRankTenThreadsCompute()
+      throws ClassNotFoundException, IOException, InterruptedException {
+    testPageRank(10);
+  }
+
+  /**
+   * Generic page rank test
+   *
+   * @param numComputeThreads Number of compute threads to use
+   * @throws java.io.IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  private void testPageRank(int numComputeThreads)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+        classes = new GiraphClasses();
+    classes.setVertexClass(SimplePageRankVertex.class);
+    classes.setVertexInputFormatClass(
+        SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
+    classes.setWorkerContextClass(
+        SimplePageRankVertex.SimplePageRankVertexWorkerContext.class);
+    classes.setMasterComputeClass(
+        SimplePageRankVertex.SimplePageRankVertexMasterCompute.class);
+    GiraphJob job = prepareJob(getCallingMethodName(), classes);
+    GiraphConfiguration conf = job.getConfiguration();
+    conf.setNumComputeThreads(numComputeThreads);
+    // Set enough partitions to generate randomness on the compute side
+    if (numComputeThreads != 1) {
+      conf.setInt(HashMasterPartitioner.USER_PARTITION_COUNT,
+          numComputeThreads * 5);
+    }
+    assertTrue(job.run(true));
+    if (!runningInDistributedMode()) {
+      double maxPageRank =
+          SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMax();
+      double minPageRank =
+          SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMin();
+      long numVertices =
+          SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalSum();
+      System.out.println(getCallingMethodName() + ": maxPageRank=" +
+          maxPageRank + " minPageRank=" +
+          minPageRank + " numVertices=" + numVertices + ", " +
+          " numComputeThreads=" + numComputeThreads);
+      assertEquals(34.03, maxPageRank, 0.001);
+      assertEquals(0.03, minPageRank, 0.00001);
+      assertEquals(5l, numVertices);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java b/giraph-core/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java
index 373483a..2f9704d 100644
--- a/giraph-core/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java
@@ -18,10 +18,11 @@
 
 package org.apache.giraph.examples;
 
+import org.apache.giraph.combiner.MinimumIntCombiner;
 import org.apache.giraph.conf.GiraphClasses;
 import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.io.IdWithValueTextOutputFormat;
-import org.apache.giraph.io.IntIntNullIntTextInputFormat;
+import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
+import org.apache.giraph.io.formats.IntIntNullIntTextInputFormat;
 import org.apache.giraph.utils.InternalVertexRunner;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java b/giraph-core/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java
deleted file mode 100644
index 7ef436e..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.BspCase;
-import org.apache.giraph.conf.GiraphClasses;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.aggregators.DoubleOverwriteAggregator;
-import org.apache.giraph.aggregators.LongSumAggregator;
-import org.apache.giraph.comm.aggregators.AggregatorUtils;
-import org.apache.giraph.examples.AggregatorsTestVertex;
-import org.apache.giraph.examples.SimpleCheckpointVertex;
-import org.apache.giraph.examples.SimplePageRankVertex;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.Progressable;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Map;
-
-/** Tests if aggregators are handled on a proper way */
-public class TestAggregatorsHandling extends BspCase {
-
-  public TestAggregatorsHandling() {
-    super(TestAggregatorsHandling.class.getName());
-  }
-
-  private Map<String, AggregatorWrapper<Writable>> getAggregatorMap
-      (MasterAggregatorHandler aggregatorHandler) {
-    try {
-      Field aggregtorMapField = aggregatorHandler.getClass().getDeclaredField
-          ("aggregatorMap");
-      aggregtorMapField.setAccessible(true);
-      return (Map<String, AggregatorWrapper<Writable>>)
-          aggregtorMapField.get(aggregatorHandler);
-    } catch (IllegalAccessException e) {
-      throw new IllegalStateException(e);
-    } catch (NoSuchFieldException e) {
-      throw new IllegalStateException(e);
-    }
-  }
-
-  /** Tests if aggregators are handled on a proper way during supersteps */
-  @Test
-  public void testAggregatorsHandling() throws IOException,
-      ClassNotFoundException, InterruptedException {
-    GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
-        classes = new GiraphClasses();
-    classes.setVertexClass(AggregatorsTestVertex.class);
-    classes.setVertexInputFormatClass(
-        SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
-    GiraphJob job = prepareJob(getCallingMethodName(), classes);
-    job.getConfiguration().setMasterComputeClass(
-        AggregatorsTestVertex.AggregatorsTestMasterCompute.class);
-    // test with aggregators split in a few requests
-    job.getConfiguration().setInt(
-        AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST, 50);
-    assertTrue(job.run(true));
-  }
-
-  /** Test if aggregators serialization captures everything */
-  @Test
-  public void testMasterAggregatorsSerialization() throws
-      IllegalAccessException, InstantiationException, IOException {
-    ImmutableClassesGiraphConfiguration conf =
-        Mockito.mock(ImmutableClassesGiraphConfiguration.class);
-    Mockito.when(conf.getAggregatorWriterClass()).thenReturn(
-        TextAggregatorWriter.class);
-    Progressable progressable = Mockito.mock(Progressable.class);
-    MasterAggregatorHandler handler =
-        new MasterAggregatorHandler(conf, progressable);
-
-    String regularAggName = "regular";
-    LongWritable regularValue = new LongWritable(5);
-    handler.registerAggregator(regularAggName, LongSumAggregator.class);
-    handler.setAggregatedValue(regularAggName, regularValue);
-
-    String persistentAggName = "persistent";
-    DoubleWritable persistentValue = new DoubleWritable(10.5);
-    handler.registerPersistentAggregator(persistentAggName,
-        DoubleOverwriteAggregator.class);
-    handler.setAggregatedValue(persistentAggName, persistentValue);
-
-    for (AggregatorWrapper<Writable> aggregator :
-        getAggregatorMap(handler).values()) {
-      aggregator.setPreviousAggregatedValue(
-          aggregator.getCurrentAggregatedValue());
-    }
-
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    handler.write(new DataOutputStream(out));
-
-    MasterAggregatorHandler restartedHandler =
-        new MasterAggregatorHandler(conf, progressable);
-    restartedHandler.readFields(
-        new DataInputStream(new ByteArrayInputStream(out.toByteArray())));
-
-    assertEquals(2, getAggregatorMap(restartedHandler).size());
-
-    AggregatorWrapper<Writable> regularAgg =
-        getAggregatorMap(restartedHandler).get(regularAggName);
-    assertTrue(
-        regularAgg.getAggregatorClass().equals(LongSumAggregator.class));
-    assertEquals(regularValue, regularAgg.getPreviousAggregatedValue());
-    assertEquals(regularValue,
-        restartedHandler.<LongWritable>getAggregatedValue(regularAggName));
-    assertFalse(regularAgg.isPersistent());
-
-    AggregatorWrapper<Writable> persistentAgg =
-        getAggregatorMap(restartedHandler).get(persistentAggName);
-    assertTrue(persistentAgg.getAggregatorClass().equals
-        (DoubleOverwriteAggregator.class));
-    assertEquals(persistentValue, persistentAgg.getPreviousAggregatedValue());
-    assertEquals(persistentValue,
-        restartedHandler.<LongWritable>getAggregatedValue(persistentAggName));
-    assertTrue(persistentAgg.isPersistent());
-  }
-
-  /**
-   * Test if aggregators are are handled properly when restarting from a
-   * checkpoint
-   */
-  @Test
-  public void testAggregatorsCheckpointing() throws ClassNotFoundException,
-      IOException, InterruptedException {
-    Path checkpointsDir = getTempPath("checkPointsForTesting");
-    Path outputPath = getTempPath(getCallingMethodName());
-    GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
-        classes = new GiraphClasses();
-    classes.setVertexClass(AggregatorsTestVertex.class);
-    classes.setMasterComputeClass(
-        AggregatorsTestVertex.AggregatorsTestMasterCompute.class);
-    classes.setVertexInputFormatClass(
-        SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
-    GiraphJob job = prepareJob(getCallingMethodName(), classes, outputPath);
-
-    job.getConfiguration().set(GiraphConstants.CHECKPOINT_DIRECTORY,
-        checkpointsDir.toString());
-    job.getConfiguration().setBoolean(
-        GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS, false);
-    job.getConfiguration().setCheckpointFrequency(4);
-
-    assertTrue(job.run(true));
-
-    // Restart the test from superstep 4
-    System.out.println("testAggregatorsCheckpointing: Restarting from " +
-        "superstep 4 with checkpoint path = " + checkpointsDir);
-    outputPath = getTempPath(getCallingMethodName() + "Restarted");
-    classes = new GiraphClasses();
-    classes.setVertexClass(AggregatorsTestVertex.class);
-    classes.setMasterComputeClass(
-        AggregatorsTestVertex.AggregatorsTestMasterCompute.class);
-    classes.setVertexInputFormatClass(
-        SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
-    GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted",
-        classes, outputPath);
-    job.getConfiguration().setMasterComputeClass(
-        SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
-    restartedJob.getConfiguration().set(
-        GiraphConstants.CHECKPOINT_DIRECTORY, checkpointsDir.toString());
-    restartedJob.getConfiguration().setLong(
-        GiraphConstants.RESTART_SUPERSTEP, 4);
-
-    assertTrue(restartedJob.run(true));
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/graph/TestIntIntNullIntVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/graph/TestIntIntNullIntVertex.java b/giraph-core/src/test/java/org/apache/giraph/graph/TestIntIntNullIntVertex.java
deleted file mode 100644
index 98991b8..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/graph/TestIntIntNullIntVertex.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-import com.google.common.collect.Lists;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Tests {@link IntIntNullIntVertex}.
- */
-public class TestIntIntNullIntVertex {
-  /**
-   * Simple instantiable class that extends {@link IntIntNullIntVertex}.
-   */
-  private static class MyIntIntNullVertex extends IntIntNullIntVertex {
-    @Override
-    public void compute(Iterable<IntWritable> messages) throws IOException {
-    }
-  }
-
-  @Test
-  public void testSerialize() throws IOException {
-    IntIntNullIntVertex vertex = new MyIntIntNullVertex();
-
-    List<Edge<IntWritable, NullWritable>> edges = Lists.newLinkedList();
-    edges.add(new Edge<IntWritable, NullWritable>(new IntWritable(3),
-        NullWritable.get()));
-    edges.add(new Edge<IntWritable, NullWritable>(new IntWritable(47),
-        NullWritable.get()));
-
-    vertex.initialize(new IntWritable(23), new IntWritable(7), edges);
-    vertex.voteToHalt();
-
-    ByteArrayOutputStream outStream = new ByteArrayOutputStream();
-    DataOutput out = new DataOutputStream(outStream);
-    vertex.write(out);
-
-    IntIntNullIntVertex vertex1 = new MyIntIntNullVertex();
-
-    ByteArrayInputStream inStream = new ByteArrayInputStream(
-        outStream.toByteArray());
-    DataInput in = new DataInputStream(inStream);
-    vertex1.readFields(in);
-
-    assertEquals(2, vertex1.getNumEdges());
-    assertEquals(true, vertex1.isHalted());
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/graph/TestMultiGraphVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/graph/TestMultiGraphVertex.java b/giraph-core/src/test/java/org/apache/giraph/graph/TestMultiGraphVertex.java
deleted file mode 100644
index ead5578..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/graph/TestMultiGraphVertex.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import com.google.common.collect.Lists;
-import org.apache.giraph.conf.GiraphConfiguration;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.io.IntWritable;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Test all multigraph mutable vertices.
- */
-public class TestMultiGraphVertex {
-  private Collection<Class<? extends Vertex<IntWritable, IntWritable,
-      IntWritable, IntWritable>>> vertexClasses = Lists.newArrayList();
-
-  public static class MyMultiGraphEdgeListVertex
-      extends MultiGraphEdgeListVertex<IntWritable, IntWritable, IntWritable,
-      IntWritable> {
-    @Override
-    public void compute(Iterable<IntWritable> messages) throws IOException { }
-  }
-
-  public static class MyMultiGraphRepresentativeVertex
-      extends MultiGraphRepresentativeVertex<IntWritable, IntWritable,
-      IntWritable, IntWritable> {
-    @Override
-    public void compute(Iterable<IntWritable> messages) throws IOException { }
-  }
-
-  @Before
-  public void setUp() {
-    vertexClasses.add(MyMultiGraphEdgeListVertex.class);
-    vertexClasses.add(MyMultiGraphRepresentativeVertex.class);
-  }
-
-  @Test
-  public void testAddRemoveEdges() {
-    for (Class<? extends Vertex<IntWritable, IntWritable,
-        IntWritable, IntWritable>> vertexClass : vertexClasses) {
-      testAddRemoveEdgesVertexClass(vertexClass);
-    }
-  }
-
-  private void testAddRemoveEdgesVertexClass(Class<? extends Vertex<IntWritable,
-      IntWritable, IntWritable, IntWritable>> vertexClass) {
-    MutableVertex<IntWritable, IntWritable, IntWritable,
-        IntWritable> vertex = instantiateVertex(vertexClass);
-
-    assertEquals(vertex.removeEdges(new IntWritable(1)), 0);
-
-    // We test a few different patterns for duplicate edges,
-    // in order to catch corner cases:
-
-    // Edge list of form: [A, B, A]
-    vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(1),
-        new IntWritable(1)));
-    vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(2),
-        new IntWritable(2)));
-    vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(1),
-        new IntWritable(10)));
-    assertEquals(vertex.getNumEdges(), 3);
-    assertEquals(vertex.removeEdges(new IntWritable(1)), 2);
-    assertEquals(vertex.getNumEdges(), 1);
-
-    // Edge list of form: [A, B, B]
-    vertex = instantiateVertex(vertexClass);
-    vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(2),
-        new IntWritable(2)));
-    vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(1),
-        new IntWritable(1)));
-    vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(1),
-        new IntWritable(10)));
-    assertEquals(vertex.getNumEdges(), 3);
-    assertEquals(vertex.removeEdges(new IntWritable(1)), 2);
-    assertEquals(vertex.getNumEdges(), 1);
-
-    // Edge list of form: [A, A, B]
-    vertex = instantiateVertex(vertexClass);
-    vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(1),
-        new IntWritable(1)));
-    vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(1),
-        new IntWritable(10)));
-    vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(2),
-        new IntWritable(2)));
-    assertEquals(vertex.getNumEdges(), 3);
-    assertEquals(vertex.removeEdges(new IntWritable(1)), 2);
-    assertEquals(vertex.getNumEdges(), 1);
-  }
-
-  private MutableVertex<IntWritable, IntWritable, IntWritable, IntWritable>
-  instantiateVertex(Class<? extends Vertex<IntWritable, IntWritable,
-      IntWritable, IntWritable>> vertexClass) {
-    GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
-    giraphConfiguration.setVertexClass(vertexClass);
-    ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration =
-        new ImmutableClassesGiraphConfiguration(giraphConfiguration);
-    MutableVertex<IntWritable, IntWritable, IntWritable,
-        IntWritable> vertex =
-        (MutableVertex<IntWritable, IntWritable,
-            IntWritable, IntWritable>)
-            immutableClassesGiraphConfiguration.createVertex();
-    vertex.initialize(new IntWritable(), new IntWritable());
-    return vertex;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/graph/TestMutableVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/graph/TestMutableVertex.java b/giraph-core/src/test/java/org/apache/giraph/graph/TestMutableVertex.java
deleted file mode 100644
index 9011200..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/graph/TestMutableVertex.java
+++ /dev/null
@@ -1,467 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.graph;
-
-import com.google.common.collect.Lists;
-import org.apache.giraph.conf.GiraphConfiguration;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.*;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-
-import static org.junit.Assert.*;
-
-/**
- * Test all the mutable vertices (except multigraph versions)
- */
-public class TestMutableVertex {
-  /** Number of repetitions */
-  public static final int REPS = 100;
-  /** Vertex classes to be tested filled in from setup() */
-  private Collection<
-      Class<? extends Vertex<IntWritable, FloatWritable, DoubleWritable,
-      LongWritable>>> vertexClasses = Lists.newArrayList();
-
-  /**
-   * Simple instantiable class that extends
-   * {@link HashMapVertex}.
-   */
-  public static class IFDLHashMapVertex extends
-      HashMapVertex<IntWritable, FloatWritable, DoubleWritable,
-          LongWritable> {
-    @Override
-    public void compute(Iterable<LongWritable> messages) throws IOException { }
-  }
-
-  /**
-   * Simple instantiable class that extends
-   * {@link EdgeListVertex}.
-   */
-  public static class IFDLEdgeListVertex extends
-      EdgeListVertex<IntWritable, FloatWritable, DoubleWritable,
-      LongWritable> {
-    @Override
-    public void compute(Iterable<LongWritable> messages) throws IOException { }
-  }
-
-  /**
-   * Simple instantiable class that extends
-   * {@link RepresentativeVertex}.
-   */
-  public static class IFDLRepresentativeVertex extends
-      RepresentativeVertex<IntWritable, FloatWritable, DoubleWritable,
-                LongWritable> {
-    @Override
-    public void compute(Iterable<LongWritable> messages) throws IOException { }
-  }
-
-  @Before
-  public void setUp() {
-    vertexClasses.add(IFDLHashMapVertex.class);
-    vertexClasses.add(IFDLEdgeListVertex.class);
-    vertexClasses.add(IFDLRepresentativeVertex.class);
-  }
-
-  @Test
-  public void testInstantiate() throws IOException {
-    for (Class<? extends Vertex<IntWritable, FloatWritable, DoubleWritable,
-        LongWritable>> vertexClass : vertexClasses) {
-      testInstantiateVertexClass(vertexClass);
-    }
-  }
-
-  /**
-   * Test a vertex class for instantiation
-   *
-   * @param vertexClass Vertex class to check
-   * @return Instantiated mutable vertex
-   */
-  private MutableVertex<IntWritable, FloatWritable, DoubleWritable,
-      LongWritable> testInstantiateVertexClass(
-      Class<? extends Vertex<IntWritable, FloatWritable, DoubleWritable,
-      LongWritable>> vertexClass) {
-    GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
-    giraphConfiguration.setVertexClass(vertexClass);
-    ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration =
-        new ImmutableClassesGiraphConfiguration(giraphConfiguration);
-    MutableVertex<IntWritable, FloatWritable, DoubleWritable,
-        LongWritable> vertex =
-        (MutableVertex<IntWritable, FloatWritable,
-            DoubleWritable, LongWritable>)
-            immutableClassesGiraphConfiguration.createVertex();
-    assertNotNull(vertex);
-    return vertex;
-  }
-
-  @Test
-  public void testEdges() {
-    for (Class<? extends Vertex<IntWritable, FloatWritable, DoubleWritable,
-        LongWritable>> vertexClass : vertexClasses) {
-      testEdgesVertexClass(vertexClass);
-    }
-  }
-
-  /**
-   * Test a vertex class for edges
-   *
-   * @param vertexClass Vertex class to check
-   */
-  private void testEdgesVertexClass(Class<? extends Vertex<IntWritable,
-      FloatWritable, DoubleWritable, LongWritable>> vertexClass) {
-    MutableVertex<IntWritable,
-        FloatWritable, DoubleWritable, LongWritable> vertex =
-        testInstantiateVertexClass(vertexClass);
-
-    List<Edge<IntWritable, DoubleWritable>> edges = Lists.newLinkedList();
-    for (int i = 1000; i > 0; --i) {
-      edges.add(new Edge<IntWritable, DoubleWritable>(
-          new IntWritable(i), new DoubleWritable(i * 2.0)));
-    }
-
-    vertex.initialize(null, null, edges);
-    assertEquals(vertex.getNumEdges(), 1000);
-    for (Edge<IntWritable, DoubleWritable> edge : vertex.getEdges()) {
-      assertEquals(edge.getValue().get(),
-          edge.getTargetVertexId().get() * 2.0d, 0d);
-    }
-    assertEquals(vertex.removeEdges(new IntWritable(500)), 1);
-    assertEquals(vertex.getNumEdges(), 999);
-  }
-
-  @Test
-  public void testGetEdges() {
-    for (Class<? extends Vertex<IntWritable, FloatWritable, DoubleWritable,
-        LongWritable>> vertexClass : vertexClasses) {
-      testGetEdgesVertexClass(vertexClass);
-    }
-  }
-
-  /**
-   * Test a vertex class for getting edges
-   *
-   * @param vertexClass Vertex class to check
-   */
-  private void testGetEdgesVertexClass(Class<? extends Vertex<IntWritable,
-      FloatWritable, DoubleWritable, LongWritable>> vertexClass) {
-    MutableVertex<IntWritable,
-        FloatWritable, DoubleWritable, LongWritable> vertex =
-        testInstantiateVertexClass(vertexClass);
-
-    List<Edge<IntWritable, DoubleWritable>> edges = Lists.newLinkedList();
-    for (int i = 1000; i > 0; --i) {
-      edges.add(new Edge<IntWritable, DoubleWritable>(
-          new IntWritable(i), new DoubleWritable(i * 3.0)));
-    }
-
-    vertex.initialize(null, null, edges);
-    assertEquals(vertex.getNumEdges(), 1000);
-    assertEquals(vertex.getEdgeValue(new IntWritable(600)),
-        new DoubleWritable(600 * 3.0));
-    assertEquals(vertex.removeEdges(new IntWritable(600)), 1);
-    assertEquals(vertex.getNumEdges(), 999);
-    assertEquals(vertex.getEdgeValue(new IntWritable(500)),
-        new DoubleWritable(500 * 3.0));
-    assertEquals(vertex.getEdgeValue(new IntWritable(700)),
-        new DoubleWritable(700 * 3.0));
-  }
-
-  @Test
-  public void testAddRemoveEdges() {
-    for (Class<? extends Vertex<IntWritable, FloatWritable, DoubleWritable,
-        LongWritable>> vertexClass : vertexClasses) {
-      testAddRemoveEdgesVertexClass(vertexClass);
-    }
-  }
-
-  /**
-   * Test a vertex class for adding/removing edges
-   *
-   * @param vertexClass Vertex class to check
-   */
-  private void testAddRemoveEdgesVertexClass(Class<? extends
-      Vertex<IntWritable, FloatWritable, DoubleWritable,
-          LongWritable>> vertexClass) {
-    MutableVertex<IntWritable,
-        FloatWritable, DoubleWritable, LongWritable> vertex =
-        testInstantiateVertexClass(vertexClass);
-
-    vertex.initialize(new IntWritable(0), new FloatWritable(0.0f));
-    assertEquals(vertex.getNumEdges(), 0);
-    assertTrue(vertex.addEdge(new Edge<IntWritable, DoubleWritable>(
-        new IntWritable(2),
-        new DoubleWritable(2.0))));
-    assertEquals(vertex.getNumEdges(), 1);
-    assertEquals(vertex.getEdgeValue(new IntWritable(2)),
-        new DoubleWritable(2.0));
-    assertTrue(vertex.addEdge(new Edge<IntWritable, DoubleWritable>(
-        new IntWritable(4),
-        new DoubleWritable(4.0))));
-    assertTrue(vertex.addEdge(new Edge<IntWritable, DoubleWritable>(
-        new IntWritable(3),
-        new DoubleWritable(3.0))));
-    assertTrue(vertex.addEdge(new Edge<IntWritable, DoubleWritable>(
-        new IntWritable(1),
-        new DoubleWritable(1.0))));
-    assertEquals(vertex.getNumEdges(), 4);
-    assertNull(vertex.getEdgeValue(new IntWritable(5)));
-    assertNull(vertex.getEdgeValue(new IntWritable(0)));
-    for (Edge<IntWritable, DoubleWritable> edge : vertex.getEdges()) {
-      assertEquals(edge.getTargetVertexId().get() * 1.0d,
-          edge.getValue().get(), 0d);
-    }
-    assertEquals(vertex.removeEdges(new IntWritable(1)), 1);
-    assertEquals(vertex.getNumEdges(), 3);
-    assertEquals(vertex.removeEdges(new IntWritable(3)), 1);
-    assertEquals(vertex.getNumEdges(), 2);
-    assertEquals(vertex.removeEdges(new IntWritable(2)), 1);
-    assertEquals(vertex.getNumEdges(), 1);
-    assertEquals(vertex.removeEdges(new IntWritable(4)), 1);
-    assertEquals(vertex.getNumEdges(), 0);
-  }
-
-  @Test
-  public void testSerialized() throws IOException {
-    for (Class<? extends Vertex<IntWritable, FloatWritable, DoubleWritable,
-        LongWritable>> vertexClass : vertexClasses) {
-      testSerializeVertexClass(vertexClass);
-      testDynamicChannelBufferSerializeVertexClass(vertexClass);
-      testUnsafeSerializeVertexClass(vertexClass);
-    }
-  }
-
-  /**
-   * Build a vertex for testing
-   *
-   * @param vertexClass Vertex class to use for testing
-   * @return Vertex that has some initial data
-   */
-  private MutableVertex<IntWritable,
-      FloatWritable, DoubleWritable, LongWritable> buildVertex(Class<? extends
-      Vertex<IntWritable, FloatWritable, DoubleWritable,
-          LongWritable>> vertexClass) {
-    MutableVertex<IntWritable,
-        FloatWritable, DoubleWritable, LongWritable> vertex =
-        testInstantiateVertexClass(vertexClass);
-
-    final int edgesCount = 200;
-    List<Edge<IntWritable, DoubleWritable>> edges =
-        Lists.newArrayListWithCapacity(edgesCount);
-    for (int i = edgesCount; i > 0; --i) {
-      edges.add(new Edge<IntWritable, DoubleWritable>(
-          new IntWritable(i), new DoubleWritable(i * 2.0)));
-    }
-    vertex.initialize(new IntWritable(2), new FloatWritable(3.0f), edges);
-    return vertex;
-  }
-
-  /**
-   * Test a vertex class for serializing
-   *
-   * @param vertexClass Vertex class to check
-   */
-  private void testSerializeVertexClass(Class<? extends
-      Vertex<IntWritable, FloatWritable, DoubleWritable,
-          LongWritable>> vertexClass) {
-    MutableVertex<IntWritable,
-        FloatWritable, DoubleWritable, LongWritable> vertex =
-        buildVertex(vertexClass);
-
-    long serializeNanosStart = 0;
-    long serializeNanos = 0;
-    byte[] byteArray = null;
-    for (int i = 0; i < REPS; ++i) {
-      serializeNanosStart = SystemTime.get().getNanoseconds();
-      byteArray = WritableUtils.writeToByteArray(vertex);
-      serializeNanos += Times.getNanosecondsSince(SystemTime.get(),
-          serializeNanosStart);
-    }
-    serializeNanos /= REPS;
-    System.out.println("testSerialize: Serializing took " +
-        serializeNanos +
-        " ns for " + byteArray.length + " bytes " +
-        (byteArray.length * 1f * Time.NS_PER_SECOND / serializeNanos) +
-        " bytes / sec for " + vertexClass.getName());
-
-    MutableVertex<IntWritable,
-        FloatWritable, DoubleWritable, LongWritable> readVertex =
-        testInstantiateVertexClass(vertexClass);
-
-    long deserializeNanosStart = 0;
-    long deserializeNanos = 0;
-    for (int i = 0; i < REPS; ++i) {
-      deserializeNanosStart = SystemTime.get().getNanoseconds();
-      WritableUtils.readFieldsFromByteArray(byteArray, readVertex);
-      deserializeNanos += Times.getNanosecondsSince(SystemTime.get(),
-          deserializeNanosStart);
-    }
-    deserializeNanos /= REPS;
-    System.out.println("testSerialize: " +
-        "Deserializing " +
-        "took " +
-        deserializeNanos +
-        " ns for " + byteArray.length + " bytes " +
-        (byteArray.length * 1f * Time.NS_PER_SECOND / deserializeNanos) +
-        " bytes / sec for " + vertexClass.getName());
-
-    assertEquals(vertex.getId(), readVertex.getId());
-    assertEquals(vertex.getValue(), readVertex.getValue());
-    assertEquals(Lists.newArrayList(vertex.getEdges()),
-        Lists.newArrayList(readVertex.getEdges()));
-  }
-
-  /**
-   * Test a vertex class for serializing with DynamicChannelBuffers
-   *
-   * @param vertexClass Vertex class to check
-   */
-  private void testDynamicChannelBufferSerializeVertexClass(Class<? extends
-      Vertex<IntWritable, FloatWritable, DoubleWritable,
-          LongWritable>> vertexClass) throws IOException {
-    MutableVertex<IntWritable,
-        FloatWritable, DoubleWritable, LongWritable> vertex =
-        buildVertex(vertexClass);
-
-    long serializeNanosStart = 0;
-    long serializeNanos = 0;
-    DynamicChannelBufferOutputStream outputStream = null;
-    for (int i = 0; i <
-        REPS; ++i) {
-      serializeNanosStart = SystemTime.get().getNanoseconds();
-      outputStream =
-          new DynamicChannelBufferOutputStream(32);
-      vertex.write(outputStream);
-      serializeNanos += Times.getNanosecondsSince(SystemTime.get(),
-          serializeNanosStart);
-    }
-    serializeNanos /= REPS;
-    System.out.println("testDynamicChannelBufferSerializeVertexClass: " +
-        "Serializing took " +
-        serializeNanos +
-        " ns for " + outputStream.getDynamicChannelBuffer().writerIndex()
-        + " bytes " +
-        (outputStream.getDynamicChannelBuffer().writerIndex() * 1f *
-            Time.NS_PER_SECOND / serializeNanos) +
-        " bytes / sec for " + vertexClass.getName());
-
-    MutableVertex<IntWritable,
-        FloatWritable, DoubleWritable, LongWritable> readVertex =
-        testInstantiateVertexClass(vertexClass);
-
-    long deserializeNanosStart = 0;
-    long deserializeNanos = 0;
-    for (int i = 0; i < REPS; ++i) {
-      deserializeNanosStart = SystemTime.get().getNanoseconds();
-      DynamicChannelBufferInputStream inputStream = new
-          DynamicChannelBufferInputStream(
-          outputStream.getDynamicChannelBuffer());
-      readVertex.readFields(inputStream);
-      deserializeNanos += Times.getNanosecondsSince(SystemTime.get(),
-          deserializeNanosStart);
-      outputStream.getDynamicChannelBuffer().readerIndex(0);
-    }
-    deserializeNanos /= REPS;
-    System.out.println("testDynamicChannelBufferSerializeVertexClass: " +
-        "Deserializing took " +
-        deserializeNanos +
-        " ns for " + outputStream.getDynamicChannelBuffer().writerIndex() +
-        " bytes " +
-        (outputStream.getDynamicChannelBuffer().writerIndex() * 1f *
-            Time.NS_PER_SECOND / deserializeNanos) +
-        " bytes / sec for " + vertexClass.getName());
-
-    assertEquals(vertex.getId(), readVertex.getId());
-    assertEquals(vertex.getValue(), readVertex.getValue());
-    assertEquals(Lists.newArrayList(vertex.getEdges()),
-        Lists.newArrayList(readVertex.getEdges()));
-  }
-
-
-  /**
-   * Test a vertex class for serializing with UnsafeByteArray(Input/Output)
-   * Stream
-   *
-   * @param vertexClass Vertex class to check
-   */
-  private void testUnsafeSerializeVertexClass(Class<? extends
-      Vertex<IntWritable, FloatWritable, DoubleWritable,
-          LongWritable>> vertexClass) throws IOException {
-    MutableVertex<IntWritable,
-        FloatWritable, DoubleWritable, LongWritable> vertex =
-        buildVertex(vertexClass);
-
-    long serializeNanosStart = 0;
-    long serializeNanos = 0;
-    UnsafeByteArrayOutputStream outputStream = null;
-    for (int i = 0; i <
-        REPS; ++i) {
-      serializeNanosStart = SystemTime.get().getNanoseconds();
-      outputStream =
-          new UnsafeByteArrayOutputStream(32);
-      vertex.write(outputStream);
-      serializeNanos += Times.getNanosecondsSince(SystemTime.get(),
-          serializeNanosStart);
-    }
-    serializeNanos /= REPS;
-    System.out.println("testUnsafeSerializeVertexClass: " +
-        "Serializing took " +
-        serializeNanos +
-        " ns for " + outputStream.getPos()
-        + " bytes " +
-        (outputStream.getPos() * 1f *
-            Time.NS_PER_SECOND / serializeNanos) +
-        " bytes / sec for " + vertexClass.getName());
-
-    MutableVertex<IntWritable,
-        FloatWritable, DoubleWritable, LongWritable> readVertex =
-        testInstantiateVertexClass(vertexClass);
-
-    long deserializeNanosStart = 0;
-    long deserializeNanos = 0;
-    for (int i = 0; i < REPS; ++i) {
-      deserializeNanosStart = SystemTime.get().getNanoseconds();
-      UnsafeByteArrayInputStream inputStream = new
-          UnsafeByteArrayInputStream(
-          outputStream.getByteArray(), 0, outputStream.getPos());
-      readVertex.readFields(inputStream);
-      deserializeNanos += Times.getNanosecondsSince(SystemTime.get(),
-          deserializeNanosStart);
-    }
-    deserializeNanos /= REPS;
-    System.out.println("testUnsafeSerializeVertexClass: " +
-        "Deserializing took " +
-        deserializeNanos +
-        " ns for " + outputStream.getPos() +
-        " bytes " +
-        (outputStream.getPos() * 1f *
-            Time.NS_PER_SECOND / deserializeNanos) +
-        " bytes / sec for " + vertexClass.getName());
-
-    assertEquals(vertex.getId(), readVertex.getId());
-    assertEquals(vertex.getValue(), readVertex.getValue());
-    assertEquals(Lists.newArrayList(vertex.getEdges()),
-        Lists.newArrayList(readVertex.getEdges()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/graph/partition/TestGiraphTransferRegulator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/graph/partition/TestGiraphTransferRegulator.java b/giraph-core/src/test/java/org/apache/giraph/graph/partition/TestGiraphTransferRegulator.java
deleted file mode 100644
index 07acefb..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/graph/partition/TestGiraphTransferRegulator.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.graph.partition;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeListVertex;
-import org.apache.giraph.graph.GiraphJob;
-import org.apache.giraph.graph.GiraphTransferRegulator;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import com.google.common.collect.Lists;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Test the GiraphTransferRegulator.
- */
-public class TestGiraphTransferRegulator {
-  /** Job filled in by setup() */
-  private GiraphJob job;
-  /** Instantiated vertex filled in from setup() */
-  private IFDLEdgeListVertex vertex = new IFDLEdgeListVertex();
-
-  /**
-   * Simple instantiable class that extends
-   * {@link org.apache.giraph.graph.EdgeListVertex}.
-   */
-  public static class IFDLEdgeListVertex extends
-      EdgeListVertex<IntWritable, FloatWritable, DoubleWritable, LongWritable> {
-    @Override
-    public void compute(Iterable<LongWritable> messages) throws IOException { }
-  }
-
-  @Before
-  public void setUp() {
-    try {
-      job = new GiraphJob("TestGiraphTransferRegulator");
-    } catch (IOException e) {
-      throw new RuntimeException("setUp: Failed", e);
-    }
-    job.getConfiguration().setVertexClass(IFDLEdgeListVertex.class);
-  }
-
-  @Test
-  public void testGiraphTransferRegulator() {
-    job.getConfiguration()
-        .setInt(GiraphTransferRegulator.MAX_VERTICES_PER_TRANSFER, 1);
-    job.getConfiguration()
-        .setInt(GiraphTransferRegulator.MAX_EDGES_PER_TRANSFER, 3);
-    List<Edge<IntWritable, DoubleWritable>> edges = Lists.newLinkedList();
-    edges.add(new Edge<IntWritable, DoubleWritable>(new IntWritable(2),
-        new DoubleWritable(22)));
-    edges.add(new Edge<IntWritable, DoubleWritable>(new IntWritable(3),
-        new DoubleWritable(33)));
-    edges.add(new Edge<IntWritable, DoubleWritable>(new IntWritable(4),
-        new DoubleWritable(44)));
-    vertex.initialize(null, null, edges);
-    GiraphTransferRegulator gtr =
-        new GiraphTransferRegulator(job.getConfiguration());
-    PartitionOwner owner = mock(PartitionOwner.class);
-    when(owner.getPartitionId()).thenReturn(57);
-    assertFalse(gtr.transferThisPartition(owner));
-    gtr.incrementCounters(owner, vertex);
-    assertTrue(gtr.transferThisPartition(owner));
-  }
-
-}


Mime
View raw message