giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject [29/51] [partial] GIRAPH-457: update module names (nitay)
Date Thu, 20 Dec 2012 04:25:32 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java b/giraph-core/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java
new file mode 100644
index 0000000..7ef436e
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java
@@ -0,0 +1,197 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.BspCase;
+import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.aggregators.DoubleOverwriteAggregator;
+import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.comm.aggregators.AggregatorUtils;
+import org.apache.giraph.examples.AggregatorsTestVertex;
+import org.apache.giraph.examples.SimpleCheckpointVertex;
+import org.apache.giraph.examples.SimplePageRankVertex;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.Progressable;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Map;
+
+/** Tests if aggregators are handled on a proper way */
+public class TestAggregatorsHandling extends BspCase {
+
+  public TestAggregatorsHandling() {
+    super(TestAggregatorsHandling.class.getName());
+  }
+
+  private Map<String, AggregatorWrapper<Writable>> getAggregatorMap
+      (MasterAggregatorHandler aggregatorHandler) {
+    try {
+      Field aggregtorMapField = aggregatorHandler.getClass().getDeclaredField
+          ("aggregatorMap");
+      aggregtorMapField.setAccessible(true);
+      return (Map<String, AggregatorWrapper<Writable>>)
+          aggregtorMapField.get(aggregatorHandler);
+    } catch (IllegalAccessException e) {
+      throw new IllegalStateException(e);
+    } catch (NoSuchFieldException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  /** Tests if aggregators are handled on a proper way during supersteps */
+  @Test
+  public void testAggregatorsHandling() throws IOException,
+      ClassNotFoundException, InterruptedException {
+    GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+        classes = new GiraphClasses();
+    classes.setVertexClass(AggregatorsTestVertex.class);
+    classes.setVertexInputFormatClass(
+        SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
+    GiraphJob job = prepareJob(getCallingMethodName(), classes);
+    job.getConfiguration().setMasterComputeClass(
+        AggregatorsTestVertex.AggregatorsTestMasterCompute.class);
+    // test with aggregators split in a few requests
+    job.getConfiguration().setInt(
+        AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST, 50);
+    assertTrue(job.run(true));
+  }
+
+  /** Test if aggregators serialization captures everything */
+  @Test
+  public void testMasterAggregatorsSerialization() throws
+      IllegalAccessException, InstantiationException, IOException {
+    ImmutableClassesGiraphConfiguration conf =
+        Mockito.mock(ImmutableClassesGiraphConfiguration.class);
+    Mockito.when(conf.getAggregatorWriterClass()).thenReturn(
+        TextAggregatorWriter.class);
+    Progressable progressable = Mockito.mock(Progressable.class);
+    MasterAggregatorHandler handler =
+        new MasterAggregatorHandler(conf, progressable);
+
+    String regularAggName = "regular";
+    LongWritable regularValue = new LongWritable(5);
+    handler.registerAggregator(regularAggName, LongSumAggregator.class);
+    handler.setAggregatedValue(regularAggName, regularValue);
+
+    String persistentAggName = "persistent";
+    DoubleWritable persistentValue = new DoubleWritable(10.5);
+    handler.registerPersistentAggregator(persistentAggName,
+        DoubleOverwriteAggregator.class);
+    handler.setAggregatedValue(persistentAggName, persistentValue);
+
+    for (AggregatorWrapper<Writable> aggregator :
+        getAggregatorMap(handler).values()) {
+      aggregator.setPreviousAggregatedValue(
+          aggregator.getCurrentAggregatedValue());
+    }
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    handler.write(new DataOutputStream(out));
+
+    MasterAggregatorHandler restartedHandler =
+        new MasterAggregatorHandler(conf, progressable);
+    restartedHandler.readFields(
+        new DataInputStream(new ByteArrayInputStream(out.toByteArray())));
+
+    assertEquals(2, getAggregatorMap(restartedHandler).size());
+
+    AggregatorWrapper<Writable> regularAgg =
+        getAggregatorMap(restartedHandler).get(regularAggName);
+    assertTrue(
+        regularAgg.getAggregatorClass().equals(LongSumAggregator.class));
+    assertEquals(regularValue, regularAgg.getPreviousAggregatedValue());
+    assertEquals(regularValue,
+        restartedHandler.<LongWritable>getAggregatedValue(regularAggName));
+    assertFalse(regularAgg.isPersistent());
+
+    AggregatorWrapper<Writable> persistentAgg =
+        getAggregatorMap(restartedHandler).get(persistentAggName);
+    assertTrue(persistentAgg.getAggregatorClass().equals
+        (DoubleOverwriteAggregator.class));
+    assertEquals(persistentValue, persistentAgg.getPreviousAggregatedValue());
+    assertEquals(persistentValue,
+        restartedHandler.<LongWritable>getAggregatedValue(persistentAggName));
+    assertTrue(persistentAgg.isPersistent());
+  }
+
+  /**
+   * Test if aggregators are are handled properly when restarting from a
+   * checkpoint
+   */
+  @Test
+  public void testAggregatorsCheckpointing() throws ClassNotFoundException,
+      IOException, InterruptedException {
+    Path checkpointsDir = getTempPath("checkPointsForTesting");
+    Path outputPath = getTempPath(getCallingMethodName());
+    GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+        classes = new GiraphClasses();
+    classes.setVertexClass(AggregatorsTestVertex.class);
+    classes.setMasterComputeClass(
+        AggregatorsTestVertex.AggregatorsTestMasterCompute.class);
+    classes.setVertexInputFormatClass(
+        SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
+    GiraphJob job = prepareJob(getCallingMethodName(), classes, outputPath);
+
+    job.getConfiguration().set(GiraphConstants.CHECKPOINT_DIRECTORY,
+        checkpointsDir.toString());
+    job.getConfiguration().setBoolean(
+        GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS, false);
+    job.getConfiguration().setCheckpointFrequency(4);
+
+    assertTrue(job.run(true));
+
+    // Restart the test from superstep 4
+    System.out.println("testAggregatorsCheckpointing: Restarting from " +
+        "superstep 4 with checkpoint path = " + checkpointsDir);
+    outputPath = getTempPath(getCallingMethodName() + "Restarted");
+    classes = new GiraphClasses();
+    classes.setVertexClass(AggregatorsTestVertex.class);
+    classes.setMasterComputeClass(
+        AggregatorsTestVertex.AggregatorsTestMasterCompute.class);
+    classes.setVertexInputFormatClass(
+        SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
+    GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted",
+        classes, outputPath);
+    job.getConfiguration().setMasterComputeClass(
+        SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
+    restartedJob.getConfiguration().set(
+        GiraphConstants.CHECKPOINT_DIRECTORY, checkpointsDir.toString());
+    restartedJob.getConfiguration().setLong(
+        GiraphConstants.RESTART_SUPERSTEP, 4);
+
+    assertTrue(restartedJob.run(true));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/org/apache/giraph/graph/TestIntIntNullIntVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/graph/TestIntIntNullIntVertex.java b/giraph-core/src/test/java/org/apache/giraph/graph/TestIntIntNullIntVertex.java
new file mode 100644
index 0000000..98991b8
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/graph/TestIntIntNullIntVertex.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.Lists;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Tests {@link IntIntNullIntVertex}.
+ */
+public class TestIntIntNullIntVertex {
+  /**
+   * Simple instantiable class that extends {@link IntIntNullIntVertex}.
+   */
+  private static class MyIntIntNullVertex extends IntIntNullIntVertex {
+    @Override
+    public void compute(Iterable<IntWritable> messages) throws IOException {
+    }
+  }
+
+  @Test
+  public void testSerialize() throws IOException {
+    IntIntNullIntVertex vertex = new MyIntIntNullVertex();
+
+    List<Edge<IntWritable, NullWritable>> edges = Lists.newLinkedList();
+    edges.add(new Edge<IntWritable, NullWritable>(new IntWritable(3),
+        NullWritable.get()));
+    edges.add(new Edge<IntWritable, NullWritable>(new IntWritable(47),
+        NullWritable.get()));
+
+    vertex.initialize(new IntWritable(23), new IntWritable(7), edges);
+    vertex.voteToHalt();
+
+    ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+    DataOutput out = new DataOutputStream(outStream);
+    vertex.write(out);
+
+    IntIntNullIntVertex vertex1 = new MyIntIntNullVertex();
+
+    ByteArrayInputStream inStream = new ByteArrayInputStream(
+        outStream.toByteArray());
+    DataInput in = new DataInputStream(inStream);
+    vertex1.readFields(in);
+
+    assertEquals(2, vertex1.getNumEdges());
+    assertEquals(true, vertex1.isHalted());
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/org/apache/giraph/graph/TestMultiGraphVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/graph/TestMultiGraphVertex.java b/giraph-core/src/test/java/org/apache/giraph/graph/TestMultiGraphVertex.java
new file mode 100644
index 0000000..ead5578
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/graph/TestMultiGraphVertex.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import com.google.common.collect.Lists;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.IntWritable;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test all multigraph mutable vertices.
+ */
+public class TestMultiGraphVertex {
+  private Collection<Class<? extends Vertex<IntWritable, IntWritable,
+      IntWritable, IntWritable>>> vertexClasses = Lists.newArrayList();
+
+  public static class MyMultiGraphEdgeListVertex
+      extends MultiGraphEdgeListVertex<IntWritable, IntWritable, IntWritable,
+      IntWritable> {
+    @Override
+    public void compute(Iterable<IntWritable> messages) throws IOException { }
+  }
+
+  public static class MyMultiGraphRepresentativeVertex
+      extends MultiGraphRepresentativeVertex<IntWritable, IntWritable,
+      IntWritable, IntWritable> {
+    @Override
+    public void compute(Iterable<IntWritable> messages) throws IOException { }
+  }
+
+  @Before
+  public void setUp() {
+    vertexClasses.add(MyMultiGraphEdgeListVertex.class);
+    vertexClasses.add(MyMultiGraphRepresentativeVertex.class);
+  }
+
+  @Test
+  public void testAddRemoveEdges() {
+    for (Class<? extends Vertex<IntWritable, IntWritable,
+        IntWritable, IntWritable>> vertexClass : vertexClasses) {
+      testAddRemoveEdgesVertexClass(vertexClass);
+    }
+  }
+
+  private void testAddRemoveEdgesVertexClass(Class<? extends Vertex<IntWritable,
+      IntWritable, IntWritable, IntWritable>> vertexClass) {
+    MutableVertex<IntWritable, IntWritable, IntWritable,
+        IntWritable> vertex = instantiateVertex(vertexClass);
+
+    assertEquals(vertex.removeEdges(new IntWritable(1)), 0);
+
+    // We test a few different patterns for duplicate edges,
+    // in order to catch corner cases:
+
+    // Edge list of form: [A, B, A]
+    vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(1),
+        new IntWritable(1)));
+    vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(2),
+        new IntWritable(2)));
+    vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(1),
+        new IntWritable(10)));
+    assertEquals(vertex.getNumEdges(), 3);
+    assertEquals(vertex.removeEdges(new IntWritable(1)), 2);
+    assertEquals(vertex.getNumEdges(), 1);
+
+    // Edge list of form: [A, B, B]
+    vertex = instantiateVertex(vertexClass);
+    vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(2),
+        new IntWritable(2)));
+    vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(1),
+        new IntWritable(1)));
+    vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(1),
+        new IntWritable(10)));
+    assertEquals(vertex.getNumEdges(), 3);
+    assertEquals(vertex.removeEdges(new IntWritable(1)), 2);
+    assertEquals(vertex.getNumEdges(), 1);
+
+    // Edge list of form: [A, A, B]
+    vertex = instantiateVertex(vertexClass);
+    vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(1),
+        new IntWritable(1)));
+    vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(1),
+        new IntWritable(10)));
+    vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(2),
+        new IntWritable(2)));
+    assertEquals(vertex.getNumEdges(), 3);
+    assertEquals(vertex.removeEdges(new IntWritable(1)), 2);
+    assertEquals(vertex.getNumEdges(), 1);
+  }
+
+  private MutableVertex<IntWritable, IntWritable, IntWritable, IntWritable>
+  instantiateVertex(Class<? extends Vertex<IntWritable, IntWritable,
+      IntWritable, IntWritable>> vertexClass) {
+    GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
+    giraphConfiguration.setVertexClass(vertexClass);
+    ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration =
+        new ImmutableClassesGiraphConfiguration(giraphConfiguration);
+    MutableVertex<IntWritable, IntWritable, IntWritable,
+        IntWritable> vertex =
+        (MutableVertex<IntWritable, IntWritable,
+            IntWritable, IntWritable>)
+            immutableClassesGiraphConfiguration.createVertex();
+    vertex.initialize(new IntWritable(), new IntWritable());
+    return vertex;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/org/apache/giraph/graph/TestMutableVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/graph/TestMutableVertex.java b/giraph-core/src/test/java/org/apache/giraph/graph/TestMutableVertex.java
new file mode 100644
index 0000000..9011200
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/graph/TestMutableVertex.java
@@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph;
+
+import com.google.common.collect.Lists;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.*;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test all the mutable vertices (except multigraph versions)
+ */
+public class TestMutableVertex {
+  /** Number of repetitions */
+  public static final int REPS = 100;
+  /** Vertex classes to be tested filled in from setup() */
+  private Collection<
+      Class<? extends Vertex<IntWritable, FloatWritable, DoubleWritable,
+      LongWritable>>> vertexClasses = Lists.newArrayList();
+
+  /**
+   * Simple instantiable class that extends
+   * {@link HashMapVertex}.
+   */
+  public static class IFDLHashMapVertex extends
+      HashMapVertex<IntWritable, FloatWritable, DoubleWritable,
+          LongWritable> {
+    @Override
+    public void compute(Iterable<LongWritable> messages) throws IOException { }
+  }
+
+  /**
+   * Simple instantiable class that extends
+   * {@link EdgeListVertex}.
+   */
+  public static class IFDLEdgeListVertex extends
+      EdgeListVertex<IntWritable, FloatWritable, DoubleWritable,
+      LongWritable> {
+    @Override
+    public void compute(Iterable<LongWritable> messages) throws IOException { }
+  }
+
+  /**
+   * Simple instantiable class that extends
+   * {@link RepresentativeVertex}.
+   */
+  public static class IFDLRepresentativeVertex extends
+      RepresentativeVertex<IntWritable, FloatWritable, DoubleWritable,
+                LongWritable> {
+    @Override
+    public void compute(Iterable<LongWritable> messages) throws IOException { }
+  }
+
+  @Before
+  public void setUp() {
+    vertexClasses.add(IFDLHashMapVertex.class);
+    vertexClasses.add(IFDLEdgeListVertex.class);
+    vertexClasses.add(IFDLRepresentativeVertex.class);
+  }
+
+  @Test
+  public void testInstantiate() throws IOException {
+    for (Class<? extends Vertex<IntWritable, FloatWritable, DoubleWritable,
+        LongWritable>> vertexClass : vertexClasses) {
+      testInstantiateVertexClass(vertexClass);
+    }
+  }
+
+  /**
+   * Test a vertex class for instantiation
+   *
+   * @param vertexClass Vertex class to check
+   * @return Instantiated mutable vertex
+   */
+  private MutableVertex<IntWritable, FloatWritable, DoubleWritable,
+      LongWritable> testInstantiateVertexClass(
+      Class<? extends Vertex<IntWritable, FloatWritable, DoubleWritable,
+      LongWritable>> vertexClass) {
+    GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
+    giraphConfiguration.setVertexClass(vertexClass);
+    ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration =
+        new ImmutableClassesGiraphConfiguration(giraphConfiguration);
+    MutableVertex<IntWritable, FloatWritable, DoubleWritable,
+        LongWritable> vertex =
+        (MutableVertex<IntWritable, FloatWritable,
+            DoubleWritable, LongWritable>)
+            immutableClassesGiraphConfiguration.createVertex();
+    assertNotNull(vertex);
+    return vertex;
+  }
+
+  @Test
+  public void testEdges() {
+    for (Class<? extends Vertex<IntWritable, FloatWritable, DoubleWritable,
+        LongWritable>> vertexClass : vertexClasses) {
+      testEdgesVertexClass(vertexClass);
+    }
+  }
+
+  /**
+   * Test a vertex class for edges
+   *
+   * @param vertexClass Vertex class to check
+   */
+  private void testEdgesVertexClass(Class<? extends Vertex<IntWritable,
+      FloatWritable, DoubleWritable, LongWritable>> vertexClass) {
+    MutableVertex<IntWritable,
+        FloatWritable, DoubleWritable, LongWritable> vertex =
+        testInstantiateVertexClass(vertexClass);
+
+    List<Edge<IntWritable, DoubleWritable>> edges = Lists.newLinkedList();
+    for (int i = 1000; i > 0; --i) {
+      edges.add(new Edge<IntWritable, DoubleWritable>(
+          new IntWritable(i), new DoubleWritable(i * 2.0)));
+    }
+
+    vertex.initialize(null, null, edges);
+    assertEquals(vertex.getNumEdges(), 1000);
+    for (Edge<IntWritable, DoubleWritable> edge : vertex.getEdges()) {
+      assertEquals(edge.getValue().get(),
+          edge.getTargetVertexId().get() * 2.0d, 0d);
+    }
+    assertEquals(vertex.removeEdges(new IntWritable(500)), 1);
+    assertEquals(vertex.getNumEdges(), 999);
+  }
+
+  @Test
+  public void testGetEdges() {
+    for (Class<? extends Vertex<IntWritable, FloatWritable, DoubleWritable,
+        LongWritable>> vertexClass : vertexClasses) {
+      testGetEdgesVertexClass(vertexClass);
+    }
+  }
+
+  /**
+   * Test a vertex class for getting edges
+   *
+   * @param vertexClass Vertex class to check
+   */
+  private void testGetEdgesVertexClass(Class<? extends Vertex<IntWritable,
+      FloatWritable, DoubleWritable, LongWritable>> vertexClass) {
+    MutableVertex<IntWritable,
+        FloatWritable, DoubleWritable, LongWritable> vertex =
+        testInstantiateVertexClass(vertexClass);
+
+    List<Edge<IntWritable, DoubleWritable>> edges = Lists.newLinkedList();
+    for (int i = 1000; i > 0; --i) {
+      edges.add(new Edge<IntWritable, DoubleWritable>(
+          new IntWritable(i), new DoubleWritable(i * 3.0)));
+    }
+
+    vertex.initialize(null, null, edges);
+    assertEquals(vertex.getNumEdges(), 1000);
+    assertEquals(vertex.getEdgeValue(new IntWritable(600)),
+        new DoubleWritable(600 * 3.0));
+    assertEquals(vertex.removeEdges(new IntWritable(600)), 1);
+    assertEquals(vertex.getNumEdges(), 999);
+    assertEquals(vertex.getEdgeValue(new IntWritable(500)),
+        new DoubleWritable(500 * 3.0));
+    assertEquals(vertex.getEdgeValue(new IntWritable(700)),
+        new DoubleWritable(700 * 3.0));
+  }
+
+  @Test
+  public void testAddRemoveEdges() {
+    for (Class<? extends Vertex<IntWritable, FloatWritable, DoubleWritable,
+        LongWritable>> vertexClass : vertexClasses) {
+      testAddRemoveEdgesVertexClass(vertexClass);
+    }
+  }
+
+  /**
+   * Test a vertex class for adding/removing edges
+   *
+   * @param vertexClass Vertex class to check
+   */
+  private void testAddRemoveEdgesVertexClass(Class<? extends
+      Vertex<IntWritable, FloatWritable, DoubleWritable,
+          LongWritable>> vertexClass) {
+    MutableVertex<IntWritable,
+        FloatWritable, DoubleWritable, LongWritable> vertex =
+        testInstantiateVertexClass(vertexClass);
+
+    vertex.initialize(new IntWritable(0), new FloatWritable(0.0f));
+    assertEquals(vertex.getNumEdges(), 0);
+    assertTrue(vertex.addEdge(new Edge<IntWritable, DoubleWritable>(
+        new IntWritable(2),
+        new DoubleWritable(2.0))));
+    assertEquals(vertex.getNumEdges(), 1);
+    assertEquals(vertex.getEdgeValue(new IntWritable(2)),
+        new DoubleWritable(2.0));
+    assertTrue(vertex.addEdge(new Edge<IntWritable, DoubleWritable>(
+        new IntWritable(4),
+        new DoubleWritable(4.0))));
+    assertTrue(vertex.addEdge(new Edge<IntWritable, DoubleWritable>(
+        new IntWritable(3),
+        new DoubleWritable(3.0))));
+    assertTrue(vertex.addEdge(new Edge<IntWritable, DoubleWritable>(
+        new IntWritable(1),
+        new DoubleWritable(1.0))));
+    assertEquals(vertex.getNumEdges(), 4);
+    assertNull(vertex.getEdgeValue(new IntWritable(5)));
+    assertNull(vertex.getEdgeValue(new IntWritable(0)));
+    for (Edge<IntWritable, DoubleWritable> edge : vertex.getEdges()) {
+      assertEquals(edge.getTargetVertexId().get() * 1.0d,
+          edge.getValue().get(), 0d);
+    }
+    assertEquals(vertex.removeEdges(new IntWritable(1)), 1);
+    assertEquals(vertex.getNumEdges(), 3);
+    assertEquals(vertex.removeEdges(new IntWritable(3)), 1);
+    assertEquals(vertex.getNumEdges(), 2);
+    assertEquals(vertex.removeEdges(new IntWritable(2)), 1);
+    assertEquals(vertex.getNumEdges(), 1);
+    assertEquals(vertex.removeEdges(new IntWritable(4)), 1);
+    assertEquals(vertex.getNumEdges(), 0);
+  }
+
+  @Test
+  public void testSerialized() throws IOException {
+    for (Class<? extends Vertex<IntWritable, FloatWritable, DoubleWritable,
+        LongWritable>> vertexClass : vertexClasses) {
+      testSerializeVertexClass(vertexClass);
+      testDynamicChannelBufferSerializeVertexClass(vertexClass);
+      testUnsafeSerializeVertexClass(vertexClass);
+    }
+  }
+
+  /**
+   * Build a vertex for testing
+   *
+   * @param vertexClass Vertex class to use for testing
+   * @return Vertex that has some initial data
+   */
+  private MutableVertex<IntWritable,
+      FloatWritable, DoubleWritable, LongWritable> buildVertex(Class<? extends
+      Vertex<IntWritable, FloatWritable, DoubleWritable,
+          LongWritable>> vertexClass) {
+    MutableVertex<IntWritable,
+        FloatWritable, DoubleWritable, LongWritable> vertex =
+        testInstantiateVertexClass(vertexClass);
+
+    final int edgesCount = 200;
+    List<Edge<IntWritable, DoubleWritable>> edges =
+        Lists.newArrayListWithCapacity(edgesCount);
+    for (int i = edgesCount; i > 0; --i) {
+      edges.add(new Edge<IntWritable, DoubleWritable>(
+          new IntWritable(i), new DoubleWritable(i * 2.0)));
+    }
+    vertex.initialize(new IntWritable(2), new FloatWritable(3.0f), edges);
+    return vertex;
+  }
+
+  /**
+   * Test a vertex class for serializing
+   *
+   * @param vertexClass Vertex class to check
+   */
+  private void testSerializeVertexClass(Class<? extends
+      Vertex<IntWritable, FloatWritable, DoubleWritable,
+          LongWritable>> vertexClass) {
+    MutableVertex<IntWritable,
+        FloatWritable, DoubleWritable, LongWritable> vertex =
+        buildVertex(vertexClass);
+
+    long serializeNanosStart = 0;
+    long serializeNanos = 0;
+    byte[] byteArray = null;
+    for (int i = 0; i < REPS; ++i) {
+      serializeNanosStart = SystemTime.get().getNanoseconds();
+      byteArray = WritableUtils.writeToByteArray(vertex);
+      serializeNanos += Times.getNanosecondsSince(SystemTime.get(),
+          serializeNanosStart);
+    }
+    serializeNanos /= REPS;
+    System.out.println("testSerialize: Serializing took " +
+        serializeNanos +
+        " ns for " + byteArray.length + " bytes " +
+        (byteArray.length * 1f * Time.NS_PER_SECOND / serializeNanos) +
+        " bytes / sec for " + vertexClass.getName());
+
+    MutableVertex<IntWritable,
+        FloatWritable, DoubleWritable, LongWritable> readVertex =
+        testInstantiateVertexClass(vertexClass);
+
+    long deserializeNanosStart = 0;
+    long deserializeNanos = 0;
+    for (int i = 0; i < REPS; ++i) {
+      deserializeNanosStart = SystemTime.get().getNanoseconds();
+      WritableUtils.readFieldsFromByteArray(byteArray, readVertex);
+      deserializeNanos += Times.getNanosecondsSince(SystemTime.get(),
+          deserializeNanosStart);
+    }
+    deserializeNanos /= REPS;
+    System.out.println("testSerialize: " +
+        "Deserializing " +
+        "took " +
+        deserializeNanos +
+        " ns for " + byteArray.length + " bytes " +
+        (byteArray.length * 1f * Time.NS_PER_SECOND / deserializeNanos) +
+        " bytes / sec for " + vertexClass.getName());
+
+    assertEquals(vertex.getId(), readVertex.getId());
+    assertEquals(vertex.getValue(), readVertex.getValue());
+    assertEquals(Lists.newArrayList(vertex.getEdges()),
+        Lists.newArrayList(readVertex.getEdges()));
+  }
+
+  /**
+   * Test a vertex class for serializing with DynamicChannelBuffers
+   *
+   * @param vertexClass Vertex class to check
+   */
+  private void testDynamicChannelBufferSerializeVertexClass(Class<? extends
+      Vertex<IntWritable, FloatWritable, DoubleWritable,
+          LongWritable>> vertexClass) throws IOException {
+    MutableVertex<IntWritable,
+        FloatWritable, DoubleWritable, LongWritable> vertex =
+        buildVertex(vertexClass);
+
+    long serializeNanosStart = 0;
+    long serializeNanos = 0;
+    DynamicChannelBufferOutputStream outputStream = null;
+    for (int i = 0; i <
+        REPS; ++i) {
+      serializeNanosStart = SystemTime.get().getNanoseconds();
+      outputStream =
+          new DynamicChannelBufferOutputStream(32);
+      vertex.write(outputStream);
+      serializeNanos += Times.getNanosecondsSince(SystemTime.get(),
+          serializeNanosStart);
+    }
+    serializeNanos /= REPS;
+    System.out.println("testDynamicChannelBufferSerializeVertexClass: " +
+        "Serializing took " +
+        serializeNanos +
+        " ns for " + outputStream.getDynamicChannelBuffer().writerIndex()
+        + " bytes " +
+        (outputStream.getDynamicChannelBuffer().writerIndex() * 1f *
+            Time.NS_PER_SECOND / serializeNanos) +
+        " bytes / sec for " + vertexClass.getName());
+
+    MutableVertex<IntWritable,
+        FloatWritable, DoubleWritable, LongWritable> readVertex =
+        testInstantiateVertexClass(vertexClass);
+
+    long deserializeNanosStart = 0;
+    long deserializeNanos = 0;
+    for (int i = 0; i < REPS; ++i) {
+      deserializeNanosStart = SystemTime.get().getNanoseconds();
+      DynamicChannelBufferInputStream inputStream = new
+          DynamicChannelBufferInputStream(
+          outputStream.getDynamicChannelBuffer());
+      readVertex.readFields(inputStream);
+      deserializeNanos += Times.getNanosecondsSince(SystemTime.get(),
+          deserializeNanosStart);
+      outputStream.getDynamicChannelBuffer().readerIndex(0);
+    }
+    deserializeNanos /= REPS;
+    System.out.println("testDynamicChannelBufferSerializeVertexClass: " +
+        "Deserializing took " +
+        deserializeNanos +
+        " ns for " + outputStream.getDynamicChannelBuffer().writerIndex() +
+        " bytes " +
+        (outputStream.getDynamicChannelBuffer().writerIndex() * 1f *
+            Time.NS_PER_SECOND / deserializeNanos) +
+        " bytes / sec for " + vertexClass.getName());
+
+    assertEquals(vertex.getId(), readVertex.getId());
+    assertEquals(vertex.getValue(), readVertex.getValue());
+    assertEquals(Lists.newArrayList(vertex.getEdges()),
+        Lists.newArrayList(readVertex.getEdges()));
+  }
+
+
+  /**
+   * Test a vertex class for serializing with UnsafeByteArray(Input/Output)
+   * Stream
+   *
+   * @param vertexClass Vertex class to check
+   */
+  private void testUnsafeSerializeVertexClass(Class<? extends
+      Vertex<IntWritable, FloatWritable, DoubleWritable,
+          LongWritable>> vertexClass) throws IOException {
+    MutableVertex<IntWritable,
+        FloatWritable, DoubleWritable, LongWritable> vertex =
+        buildVertex(vertexClass);
+
+    long serializeNanosStart = 0;
+    long serializeNanos = 0;
+    UnsafeByteArrayOutputStream outputStream = null;
+    for (int i = 0; i <
+        REPS; ++i) {
+      serializeNanosStart = SystemTime.get().getNanoseconds();
+      outputStream =
+          new UnsafeByteArrayOutputStream(32);
+      vertex.write(outputStream);
+      serializeNanos += Times.getNanosecondsSince(SystemTime.get(),
+          serializeNanosStart);
+    }
+    serializeNanos /= REPS;
+    System.out.println("testUnsafeSerializeVertexClass: " +
+        "Serializing took " +
+        serializeNanos +
+        " ns for " + outputStream.getPos()
+        + " bytes " +
+        (outputStream.getPos() * 1f *
+            Time.NS_PER_SECOND / serializeNanos) +
+        " bytes / sec for " + vertexClass.getName());
+
+    MutableVertex<IntWritable,
+        FloatWritable, DoubleWritable, LongWritable> readVertex =
+        testInstantiateVertexClass(vertexClass);
+
+    long deserializeNanosStart = 0;
+    long deserializeNanos = 0;
+    for (int i = 0; i < REPS; ++i) {
+      deserializeNanosStart = SystemTime.get().getNanoseconds();
+      UnsafeByteArrayInputStream inputStream = new
+          UnsafeByteArrayInputStream(
+          outputStream.getByteArray(), 0, outputStream.getPos());
+      readVertex.readFields(inputStream);
+      deserializeNanos += Times.getNanosecondsSince(SystemTime.get(),
+          deserializeNanosStart);
+    }
+    deserializeNanos /= REPS;
+    System.out.println("testUnsafeSerializeVertexClass: " +
+        "Deserializing took " +
+        deserializeNanos +
+        " ns for " + outputStream.getPos() +
+        " bytes " +
+        (outputStream.getPos() * 1f *
+            Time.NS_PER_SECOND / deserializeNanos) +
+        " bytes / sec for " + vertexClass.getName());
+
+    assertEquals(vertex.getId(), readVertex.getId());
+    assertEquals(vertex.getValue(), readVertex.getValue());
+    assertEquals(Lists.newArrayList(vertex.getEdges()),
+        Lists.newArrayList(readVertex.getEdges()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/org/apache/giraph/graph/partition/TestGiraphTransferRegulator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/graph/partition/TestGiraphTransferRegulator.java b/giraph-core/src/test/java/org/apache/giraph/graph/partition/TestGiraphTransferRegulator.java
new file mode 100644
index 0000000..07acefb
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/graph/partition/TestGiraphTransferRegulator.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph.partition;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.GiraphTransferRegulator;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Test the GiraphTransferRegulator.
+ */
+public class TestGiraphTransferRegulator {
+  /** Job filled in by setup() */
+  private GiraphJob job;
+  /** Instantiated vertex filled in from setup() */
+  private IFDLEdgeListVertex vertex = new IFDLEdgeListVertex();
+
+  /**
+   * Simple instantiable class that extends
+   * {@link org.apache.giraph.graph.EdgeListVertex}.
+   */
+  public static class IFDLEdgeListVertex extends
+      EdgeListVertex<IntWritable, FloatWritable, DoubleWritable, LongWritable> {
+    @Override
+    public void compute(Iterable<LongWritable> messages) throws IOException { }
+  }
+
+  @Before
+  public void setUp() {
+    try {
+      job = new GiraphJob("TestGiraphTransferRegulator");
+    } catch (IOException e) {
+      throw new RuntimeException("setUp: Failed", e);
+    }
+    job.getConfiguration().setVertexClass(IFDLEdgeListVertex.class);
+  }
+
+  @Test
+  public void testGiraphTransferRegulator() {
+    job.getConfiguration()
+        .setInt(GiraphTransferRegulator.MAX_VERTICES_PER_TRANSFER, 1);
+    job.getConfiguration()
+        .setInt(GiraphTransferRegulator.MAX_EDGES_PER_TRANSFER, 3);
+    List<Edge<IntWritable, DoubleWritable>> edges = Lists.newLinkedList();
+    edges.add(new Edge<IntWritable, DoubleWritable>(new IntWritable(2),
+        new DoubleWritable(22)));
+    edges.add(new Edge<IntWritable, DoubleWritable>(new IntWritable(3),
+        new DoubleWritable(33)));
+    edges.add(new Edge<IntWritable, DoubleWritable>(new IntWritable(4),
+        new DoubleWritable(44)));
+    vertex.initialize(null, null, edges);
+    GiraphTransferRegulator gtr =
+        new GiraphTransferRegulator(job.getConfiguration());
+    PartitionOwner owner = mock(PartitionOwner.class);
+    when(owner.getPartitionId()).thenReturn(57);
+    assertFalse(gtr.transferThisPartition(owner));
+    gtr.incrementCounters(owner, vertex);
+    assertTrue(gtr.transferThisPartition(owner));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java b/giraph-core/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java
new file mode 100644
index 0000000..098899b
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.IntIntNullIntVertex;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.UnsafeByteArrayInputStream;
+import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Iterables;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test case for partition stores.
+ */
+public class TestPartitionStores {
+  private ImmutableClassesGiraphConfiguration conf;
+  private Mapper<?, ?, ?, ?>.Context context;
+
+  public static class MyVertex extends IntIntNullIntVertex {
+    @Override
+    public void compute(Iterable<IntWritable> messages) throws IOException {}
+  }
+
+  private Partition<IntWritable, IntWritable, NullWritable,
+      IntWritable> createPartition(ImmutableClassesGiraphConfiguration conf,
+                                   Integer id,
+                                   Vertex<IntWritable, IntWritable,
+                                       NullWritable, IntWritable>... vertices) {
+    Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition =
+        conf.createPartition(id, context);
+    for (Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v :
+        vertices) {
+      partition.putVertex(v);
+    }
+    return partition;
+  }
+
+  @Before
+  public void setUp() {
+    GiraphConfiguration configuration = new GiraphConfiguration();
+    configuration.setVertexClass(MyVertex.class);
+    conf = new ImmutableClassesGiraphConfiguration(configuration);
+    context = mock(Mapper.Context.class);
+  }
+
+  @Test
+  public void testSimplePartitionStore() {
+    PartitionStore<IntWritable, IntWritable, NullWritable, IntWritable>
+        partitionStore = new SimplePartitionStore<IntWritable, IntWritable,
+        NullWritable, IntWritable>(conf, context);
+    testReadWrite(partitionStore, conf);
+  }
+
+  @Test
+  public void testUnsafePartitionSerializationClass() throws IOException {
+    conf.setPartitionClass(ByteArrayPartition.class);
+    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v1 =
+        new MyVertex();
+    v1.initialize(new IntWritable(1), new IntWritable(1));
+    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v2 =
+        new MyVertex();
+    v2.initialize(new IntWritable(2), new IntWritable(2));
+    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v3 =
+        new MyVertex();
+    v3.initialize(new IntWritable(3), new IntWritable(3));
+    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v4 =
+        new MyVertex();
+    v4.initialize(new IntWritable(4), new IntWritable(4));
+    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v5 =
+        new MyVertex();
+    v5.initialize(new IntWritable(5), new IntWritable(5));
+    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v6 =
+        new MyVertex();
+    v6.initialize(new IntWritable(6), new IntWritable(6));
+    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v7 =
+        new MyVertex();
+    v7.initialize(new IntWritable(7), new IntWritable(7));
+
+    Partition<IntWritable, IntWritable, NullWritable,
+        IntWritable> partition =
+        createPartition(conf, 3, v1, v2, v3, v4, v5, v6, v7);
+    assertEquals(3, partition.getId());
+    assertEquals(0, partition.getEdgeCount());
+    assertEquals(7, partition.getVertexCount());
+    UnsafeByteArrayOutputStream outputStream = new
+        UnsafeByteArrayOutputStream();
+    partition.write(outputStream);
+    UnsafeByteArrayInputStream inputStream = new UnsafeByteArrayInputStream(
+        outputStream.getByteArray(), 0, outputStream.getPos());
+    Partition<IntWritable, IntWritable, NullWritable,
+        IntWritable> deserializatedPartition = conf.createPartition(-1,
+        context);
+    deserializatedPartition.readFields(inputStream);
+
+    assertEquals(3, deserializatedPartition.getId());
+    assertEquals(0, deserializatedPartition.getEdgeCount());
+    assertEquals(7, deserializatedPartition.getVertexCount());
+  }
+
+  @Test
+  public void testDiskBackedPartitionStore() {
+    conf.setBoolean(GiraphConstants.USE_OUT_OF_CORE_GRAPH, true);
+    conf.setInt(GiraphConstants.MAX_PARTITIONS_IN_MEMORY, 1);
+
+    PartitionStore<IntWritable, IntWritable, NullWritable, IntWritable>
+        partitionStore = new DiskBackedPartitionStore<IntWritable,
+                IntWritable, NullWritable, IntWritable>(conf, context);
+    testReadWrite(partitionStore, conf);
+
+    conf.setInt(GiraphConstants.MAX_PARTITIONS_IN_MEMORY, 2);
+    partitionStore = new DiskBackedPartitionStore<IntWritable,
+            IntWritable, NullWritable, IntWritable>(conf, context);
+    testReadWrite(partitionStore, conf);
+  }
+
+  /**
+   * Test reading/writing to/from a partition store
+   *
+   * @param partitionStore Partition store to test
+   * @param conf Configuration to use
+   */
+  public void testReadWrite(
+      PartitionStore<IntWritable, IntWritable,
+          NullWritable, IntWritable> partitionStore,
+      ImmutableClassesGiraphConfiguration conf) {
+    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v1 =
+        new MyVertex();
+    v1.initialize(new IntWritable(1), new IntWritable(1));
+    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v2 =
+        new MyVertex();
+    v2.initialize(new IntWritable(2), new IntWritable(2));
+    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v3 =
+        new MyVertex();
+    v3.initialize(new IntWritable(3), new IntWritable(3));
+    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v4 =
+        new MyVertex();
+    v4.initialize(new IntWritable(4), new IntWritable(4));
+    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v5 =
+        new MyVertex();
+    v5.initialize(new IntWritable(5), new IntWritable(5));
+    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v6 =
+        new MyVertex();
+    v6.initialize(new IntWritable(7), new IntWritable(7));
+    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v7 =
+        new MyVertex();
+    v7.initialize(new IntWritable(7), new IntWritable(7));
+
+    partitionStore.addPartition(createPartition(conf, 1, v1, v2));
+    partitionStore.addPartition(createPartition(conf, 2, v3));
+    partitionStore.addPartition(createPartition(conf, 2, v4));
+    partitionStore.addPartition(createPartition(conf, 3, v5));
+    partitionStore.addPartition(createPartition(conf, 1, v6));
+    partitionStore.addPartition(createPartition(conf, 4, v7));
+
+    Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition1 =
+        partitionStore.getPartition(1);
+    Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition2 =
+        partitionStore.getPartition(2);
+    Partition<IntWritable, IntWritable, NullWritable,
+        IntWritable> partition3 = partitionStore.removePartition(3);
+    Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition4 =
+        partitionStore.getPartition(4);
+
+    assertEquals(3, partitionStore.getNumPartitions());
+    assertEquals(3, Iterables.size(partitionStore.getPartitionIds()));
+    assertEquals(3, Iterables.size(partitionStore.getPartitions()));
+    assertTrue(partitionStore.hasPartition(1));
+    assertTrue(partitionStore.hasPartition(2));
+    assertFalse(partitionStore.hasPartition(3));
+    assertTrue(partitionStore.hasPartition(4));
+    assertEquals(3, partition1.getVertexCount());
+    assertEquals(2, partition2.getVertexCount());
+    assertEquals(1, partition3.getVertexCount());
+    assertEquals(1, partition4.getVertexCount());
+
+    partitionStore.deletePartition(2);
+
+    assertEquals(2, partitionStore.getNumPartitions());
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/org/apache/giraph/io/TestAdjacencyListTextVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestAdjacencyListTextVertexOutputFormat.java b/giraph-core/src/test/java/org/apache/giraph/io/TestAdjacencyListTextVertexOutputFormat.java
new file mode 100644
index 0000000..36f73ab
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestAdjacencyListTextVertexOutputFormat.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class TestAdjacencyListTextVertexOutputFormat extends
+    AdjacencyListTextVertexOutputFormat<Text, DoubleWritable, DoubleWritable> {
+
+  protected AdjacencyListTextVertexWriter createVertexWriter(
+      final RecordWriter<Text, Text> tw) {
+    AdjacencyListTextVertexWriter writer = new AdjacencyListTextVertexWriter() {
+      @Override
+      protected RecordWriter<Text, Text> createLineRecordWriter(
+          TaskAttemptContext context) throws IOException, InterruptedException {
+        return tw;
+      }
+    };
+    return writer;
+  }
+
+  @Test
+  public void testVertexWithNoEdges() throws IOException, InterruptedException {
+    Configuration conf = new Configuration();
+    TaskAttemptContext tac = mock(TaskAttemptContext.class);
+    when(tac.getConfiguration()).thenReturn(conf);
+
+    Vertex vertex = mock(Vertex.class);
+    when(vertex.getId()).thenReturn(new Text("The Beautiful South"));
+    when(vertex.getValue()).thenReturn(new DoubleWritable(32.2d));
+    // Create empty iterable == no edges
+    when(vertex.getEdges()).thenReturn(new ArrayList<Text>());
+
+    RecordWriter<Text, Text> tw = mock(RecordWriter.class);
+    AdjacencyListTextVertexWriter writer = createVertexWriter(tw);
+    writer.initialize(tac);
+    writer.writeVertex(vertex);
+
+    Text expected = new Text("The Beautiful South\t32.2");
+    verify(tw).write(expected, null);
+    verify(vertex, times(1)).getEdges();
+    verify(vertex, times(0)).getEdgeValue(Matchers.<WritableComparable>any());
+  }
+
+  @Test
+  public void testVertexWithEdges() throws IOException, InterruptedException {
+    Configuration conf = new Configuration();
+    TaskAttemptContext tac = mock(TaskAttemptContext.class);
+    when(tac.getConfiguration()).thenReturn(conf);
+
+    Vertex vertex = mock(Vertex.class);
+    when(vertex.getId()).thenReturn(new Text("San Francisco"));
+    when(vertex.getValue()).thenReturn(new DoubleWritable(0d));
+    when(vertex.getTotalNumEdges()).thenReturn(2l);
+    ArrayList<Edge<Text, DoubleWritable>> cities = new ArrayList<Edge<Text,
+        DoubleWritable>>();
+    Collections.addAll(cities,
+        new Edge<Text, DoubleWritable>(
+            new Text("Los Angeles"), new DoubleWritable(347.16)),
+        new Edge<Text, DoubleWritable>(
+            new Text("Phoenix"), new DoubleWritable(652.48)));
+
+    when(vertex.getEdges()).thenReturn(cities);
+
+    RecordWriter<Text,Text> tw = mock(RecordWriter.class);
+    AdjacencyListTextVertexWriter writer = createVertexWriter(tw);
+    writer.initialize(tac);
+    writer.writeVertex(vertex);
+
+    Text expected = new Text("San Francisco\t0.0\tLos Angeles\t347.16\t" +
+            "Phoenix\t652.48");
+    verify(tw).write(expected, null);
+    verify(vertex, times(1)).getEdges();
+  }
+
+  @Test
+  public void testWithDifferentDelimiter() throws IOException, InterruptedException {
+    Configuration conf = new Configuration();
+    conf.set(AdjacencyListTextVertexOutputFormat.LINE_TOKENIZE_VALUE, ":::");
+    TaskAttemptContext tac = mock(TaskAttemptContext.class);
+    when(tac.getConfiguration()).thenReturn(conf);
+
+    Vertex vertex = mock(Vertex.class);
+    when(vertex.getId()).thenReturn(new Text("San Francisco"));
+    when(vertex.getValue()).thenReturn(new DoubleWritable(0d));
+    when(vertex.getTotalNumEdges()).thenReturn(2l);
+    ArrayList<Edge<Text, DoubleWritable>> cities = new ArrayList<Edge<Text,
+        DoubleWritable>>();
+    Collections.addAll(cities,
+        new Edge<Text, DoubleWritable>(
+            new Text("Los Angeles"), new DoubleWritable(347.16)),
+        new Edge<Text, DoubleWritable>(
+            new Text("Phoenix"), new DoubleWritable(652.48)));
+
+    when(vertex.getEdges()).thenReturn(cities);
+
+    RecordWriter<Text,Text> tw = mock(RecordWriter.class);
+    AdjacencyListTextVertexWriter writer = createVertexWriter(tw);
+    writer.initialize(tac);
+    writer.writeVertex(vertex);
+
+    Text expected = new Text("San Francisco:::0.0:::Los Angeles:::347.16:::" +
+            "Phoenix:::652.48");
+    verify(tw).write(expected, null);
+    verify(vertex, times(1)).getEdges();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/org/apache/giraph/io/TestIdWithValueTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestIdWithValueTextOutputFormat.java b/giraph-core/src/test/java/org/apache/giraph/io/TestIdWithValueTextOutputFormat.java
new file mode 100644
index 0000000..17511c4
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestIdWithValueTextOutputFormat.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+public class TestIdWithValueTextOutputFormat extends
+    IdWithValueTextOutputFormat<Text, DoubleWritable, Writable> {
+  @Test
+  public void testHappyPath() throws IOException, InterruptedException {
+    Configuration conf = new Configuration();
+    Text expected = new Text("Four Tops\t4.0");
+
+    IdWithValueTestWorker(conf, expected);
+  }
+
+  @Test
+  public void testReverseIdAndValue() throws IOException, InterruptedException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(REVERSE_ID_AND_VALUE, true);
+    Text expected = new Text("4.0\tFour Tops");
+
+    IdWithValueTestWorker(conf, expected);
+  }
+
+  @Test
+  public void testWithDifferentDelimiter()  throws IOException,
+      InterruptedException {
+    Configuration conf = new Configuration();
+    conf.set(LINE_TOKENIZE_VALUE, "blah");
+    Text expected = new Text("Four Topsblah4.0");
+
+    IdWithValueTestWorker(conf, expected);
+  }
+
+  private void IdWithValueTestWorker(Configuration conf, Text expected)
+      throws IOException, InterruptedException {
+    TaskAttemptContext tac = mock(TaskAttemptContext.class);
+    when(tac.getConfiguration()).thenReturn(conf);
+
+    Vertex vertex = mock(Vertex.class);
+    when(vertex.getId()).thenReturn(new Text("Four Tops"));
+    when(vertex.getValue()).thenReturn(new DoubleWritable(4d));
+
+    // Create empty iterator == no edges
+    when(vertex.getEdges()).thenReturn(new ArrayList<Text>());
+
+    final RecordWriter<Text, Text> tw = mock(RecordWriter.class);
+    IdWithValueVertexWriter writer = new IdWithValueVertexWriter() {
+      @Override
+      protected RecordWriter<Text, Text> createLineRecordWriter(
+          TaskAttemptContext context) throws IOException, InterruptedException {
+        return tw;
+      }
+    };
+    writer.initialize(tac);
+    writer.writeVertex(vertex);
+
+    verify(tw).write(expected, null);
+    verify(vertex, times(0)).getEdges();
+    verify(vertex, times(0)).getEdgeValue(Matchers.<WritableComparable>any());
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/org/apache/giraph/io/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java b/giraph-core/src/test/java/org/apache/giraph/io/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java
new file mode 100644
index 0000000..b03c8e4
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io;
+
+
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.giraph.io.TestTextDoubleDoubleAdjacencyListVertexInputFormat.assertValidVertex;
+import static org.apache.giraph.io.TestTextDoubleDoubleAdjacencyListVertexInputFormat.setGraphState;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestLongDoubleDoubleAdjacencyListVertexInputFormat extends
+    LongDoubleDoubleAdjacencyListVertexInputFormat<BooleanWritable> {
+
+  private RecordReader<LongWritable, Text> rr;
+  private Configuration conf;
+  private TaskAttemptContext tac;
+  private GraphState<LongWritable, DoubleWritable, DoubleWritable, BooleanWritable> graphState;
+
+  @Before
+  public void setUp() throws IOException, InterruptedException {
+    rr = mock(RecordReader.class);
+    when(rr.nextKeyValue()).thenReturn(true);
+    conf = new Configuration();
+    conf.setClass(GiraphConstants.VERTEX_CLASS, DummyVertex.class, Vertex.class);
+    conf.setClass(GiraphConstants.VERTEX_ID_CLASS, LongWritable.class, Writable.class);
+    conf.setClass(GiraphConstants.VERTEX_VALUE_CLASS, DoubleWritable.class, Writable.class);
+    graphState = mock(GraphState.class);
+    tac = mock(TaskAttemptContext.class);
+    when(tac.getConfiguration()).thenReturn(conf);
+  }
+
+  protected TextVertexReader createVertexReader(
+       RecordReader<LongWritable, Text> rr) {
+    return createVertexReader(rr, null);
+  }
+
+  protected TextVertexReader createVertexReader(
+      final RecordReader<LongWritable, Text> rr, LineSanitizer lineSanitizer) {
+    return new LongDoubleDoubleAdjacencyListVertexReader(lineSanitizer) {
+      @Override
+      protected RecordReader<LongWritable, Text> createLineRecordReader(
+          InputSplit inputSplit, TaskAttemptContext context)
+          throws IOException, InterruptedException {
+        return rr;
+      }
+    };
+  }
+
+  @Test
+  public void testIndexMustHaveValue() throws IOException, InterruptedException {
+    String input = "123";
+
+    when(rr.getCurrentValue()).thenReturn(new Text(input));
+    TextVertexReader vr = createVertexReader(rr);
+
+
+    vr.initialize(null, tac);
+
+    try {
+      vr.nextVertex();
+      vr.getCurrentVertex();
+      fail("Should have thrown an IllegalArgumentException");
+    } catch (IllegalArgumentException iae) {
+      assertTrue(iae.getMessage().startsWith("Line did not split correctly: "));
+    }
+  }
+
+  @Test
+  public void testEdgesMustHaveValues() throws IOException, InterruptedException {
+    String input = "99\t55.2\t100";
+
+    when(rr.getCurrentValue()).thenReturn(new Text(input));
+    TextVertexReader vr = createVertexReader(rr);
+
+
+    vr.initialize(null, tac);
+
+    try {
+      vr.nextVertex();
+      vr.getCurrentVertex();
+      fail("Should have thrown an IllegalArgumentException");
+    } catch (IllegalArgumentException iae) {
+      assertTrue(iae.getMessage().startsWith("Line did not split correctly: "));
+    }
+  }
+
+  @Test
+  public void testHappyPath() throws Exception {
+    String input = "42\t0.1\t99\t0.2\t2000\t0.3\t4000\t0.4";
+
+    when(rr.getCurrentValue()).thenReturn(new Text(input));
+    TextVertexReader vr = createVertexReader(rr);
+
+
+    vr.initialize(null, tac);
+
+    assertTrue("Should have been able to read vertex", vr.nextVertex());
+    Vertex<LongWritable, DoubleWritable, DoubleWritable, BooleanWritable>
+        vertex = vr.getCurrentVertex();
+    setGraphState(vertex, graphState);
+    assertValidVertex(conf, graphState, vertex,
+        new LongWritable(42), new DoubleWritable(0.1),
+        new Edge<LongWritable, DoubleWritable>(new LongWritable(99), new DoubleWritable(0.2)),
+        new Edge<LongWritable, DoubleWritable>(new LongWritable(2000), new DoubleWritable(0.3)),
+        new Edge<LongWritable, DoubleWritable>(new LongWritable(4000), new DoubleWritable(0.4)));
+    assertEquals(vertex.getNumEdges(), 3);
+  }
+
+  @Test
+  public void testDifferentSeparators() throws Exception {
+    String input = "12345:42.42:9999999:99.9";
+
+    when(rr.getCurrentValue()).thenReturn(new Text(input));
+    conf.set(AdjacencyListTextVertexInputFormat.LINE_TOKENIZE_VALUE, ":");
+    TextVertexReader vr = createVertexReader(rr);
+
+    vr.initialize(null, tac);
+    assertTrue("Should have been able to read vertex", vr.nextVertex());
+    Vertex<LongWritable, DoubleWritable, DoubleWritable, BooleanWritable>
+        vertex = vr.getCurrentVertex();
+    setGraphState(vertex, graphState);
+    assertValidVertex(conf, graphState, vertex, new LongWritable(12345), new DoubleWritable(42.42),
+       new Edge<LongWritable, DoubleWritable>(new LongWritable(9999999), new DoubleWritable(99.9)));
+    assertEquals(vertex.getNumEdges(), 1);
+  }
+
+  public static class DummyVertex
+      extends EdgeListVertex<LongWritable, DoubleWritable,
+      DoubleWritable, BooleanWritable> {
+    @Override
+    public void compute(Iterable<BooleanWritable> messages) throws IOException {
+      // ignore
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java b/giraph-core/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java
new file mode 100644
index 0000000..c447598
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.graph.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestTextDoubleDoubleAdjacencyListVertexInputFormat extends
+    TextDoubleDoubleAdjacencyListVertexInputFormat<BooleanWritable> {
+
+  private RecordReader<LongWritable, Text> rr;
+  private Configuration conf;
+  private TaskAttemptContext tac;
+  private GraphState<Text, DoubleWritable, DoubleWritable, BooleanWritable> graphState;
+
+  @Before
+  public void setUp() throws IOException, InterruptedException {
+    rr = mock(RecordReader.class);
+    when(rr.nextKeyValue()).thenReturn(true).thenReturn(false);
+    conf = new Configuration();
+    conf.setClass(GiraphConstants.VERTEX_CLASS, DummyVertex.class, Vertex.class);
+    conf.setClass(GiraphConstants.VERTEX_ID_CLASS, Text.class, Writable.class);
+    conf.setClass(GiraphConstants.VERTEX_VALUE_CLASS, DoubleWritable.class, Writable.class);
+    graphState = mock(GraphState.class);
+    tac = mock(TaskAttemptContext.class);
+    when(tac.getConfiguration()).thenReturn(conf);
+  }
+
+  protected TextVertexReader createVertexReader(
+       RecordReader<LongWritable, Text> rr) {
+    return createVertexReader(rr, null);
+  }
+
+  protected TextVertexReader createVertexReader(
+      final RecordReader<LongWritable, Text> rr, LineSanitizer lineSanitizer) {
+    return new TextDoubleDoubleAdjacencyListVertexReader(lineSanitizer) {
+      @Override
+      protected RecordReader<LongWritable, Text> createLineRecordReader(
+          InputSplit inputSplit, TaskAttemptContext context)
+          throws IOException, InterruptedException {
+        return rr;
+      }
+    };
+  }
+
+  @Test
+  public void testIndexMustHaveValue() throws IOException, InterruptedException {
+    String input = "hi";
+
+    when(rr.getCurrentValue()).thenReturn(new Text(input));
+    TextVertexReader vr = createVertexReader(rr);
+
+    vr.initialize(null, tac);
+
+    try {
+      vr.nextVertex();
+      vr.getCurrentVertex();
+      fail("Should have thrown an IllegalArgumentException");
+    } catch (IllegalArgumentException iae) {
+      assertTrue(iae.getMessage().startsWith("Line did not split correctly: "));
+    }
+  }
+
+  @Test
+  public void testEdgesMustHaveValues() throws IOException, InterruptedException {
+    String input = "index\t55.66\tindex2";
+
+    when(rr.getCurrentValue()).thenReturn(new Text(input));
+    TextVertexReader vr = createVertexReader(rr);
+    vr.initialize(null, tac);
+    try {
+      vr.nextVertex();
+      vr.getCurrentVertex();
+      fail("Should have thrown an IllegalArgumentException");
+    } catch (IllegalArgumentException iae) {
+      assertTrue(iae.getMessage().startsWith("Line did not split correctly: "));
+    }
+  }
+
+  public static void setGraphState(Vertex vertex, GraphState graphState) throws Exception {
+    Class<? extends Vertex> c = Vertex.class;
+    Method m = c.getDeclaredMethod("setGraphState", GraphState.class);
+    m.setAccessible(true);
+    m.invoke(vertex, graphState);
+  }
+
+  public static <I extends WritableComparable, V extends Writable,
+      E extends Writable, M extends Writable> void assertValidVertex(Configuration conf,
+      GraphState<I, V, E, M> graphState, Vertex<I, V, E, M> actual,
+      I expectedId, V expectedValue, Edge<I, E>... edges)
+      throws Exception {
+    Vertex<I, V, E, M> expected = BspUtils.createVertex(conf);
+    setGraphState(expected, graphState);
+    expected.initialize(expectedId, expectedValue, Arrays.asList(edges));
+    assertValid(expected, actual);
+  }
+
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> void
+  assertValid(Vertex<I, V, E, M> expected, Vertex<I, V, E, M> actual) {
+    assertEquals(expected.getId(), actual.getId());
+    assertEquals(expected.getValue(), actual.getValue());
+    assertEquals(expected.getTotalNumEdges(), actual.getTotalNumEdges());
+    List<Edge<I, E>> expectedEdges = Lists.newArrayList();
+    List<Edge<I, E>> actualEdges = Lists.newArrayList();
+    Iterables.addAll(actualEdges, actual.getEdges());
+    Iterables.addAll(expectedEdges, expected.getEdges());
+    Collections.sort(expectedEdges);
+    Collections.sort(actualEdges);
+    for(int i = 0; i < expectedEdges.size(); i++) {
+      assertEquals(expectedEdges.get(i), actualEdges.get(i));
+    }
+  }
+
+  @Test
+  public void testHappyPath() throws Exception {
+    String input = "Hi\t0\tCiao\t1.123\tBomdia\t2.234\tOla\t3.345";
+
+    when(rr.getCurrentValue()).thenReturn(new Text(input));
+    TextVertexReader vr = createVertexReader(rr);
+
+    vr.initialize(null, tac);
+    assertTrue("Should have been able to add a vertex", vr.nextVertex());
+    Vertex<Text, DoubleWritable, DoubleWritable, BooleanWritable> vertex =
+        vr.getCurrentVertex();
+    setGraphState(vertex, graphState);
+    assertValidVertex(conf, graphState, vertex, new Text("Hi"), new DoubleWritable(0),
+        new Edge<Text, DoubleWritable>(new Text("Ciao"), new DoubleWritable(1.123d)),
+        new Edge<Text, DoubleWritable>(new Text("Bomdia"), new DoubleWritable(2.234d)),
+        new Edge<Text, DoubleWritable>(new Text("Ola"), new DoubleWritable(3.345d)));
+    assertEquals(vertex.getNumEdges(), 3);
+  }
+
+  @Test
+  public void testLineSanitizer() throws Exception {
+    String input = "Bye\t0.01\tCiao\t1.001\tTchau\t2.0001\tAdios\t3.00001";
+
+    AdjacencyListTextVertexInputFormat.LineSanitizer toUpper =
+        new AdjacencyListTextVertexInputFormat.LineSanitizer() {
+      @Override
+      public String sanitize(String s) {
+        return s.toUpperCase();
+      }
+    };
+
+    when(rr.getCurrentValue()).thenReturn(new Text(input));
+    TextVertexReader vr = createVertexReader(rr, toUpper);
+
+    vr.initialize(null, tac);
+    assertTrue("Should have been able to read vertex", vr.nextVertex());
+    Vertex<Text, DoubleWritable, DoubleWritable, BooleanWritable> vertex =
+        vr.getCurrentVertex();
+    setGraphState(vertex, graphState);
+    assertValidVertex(conf, graphState, vertex,
+        new Text("BYE"), new DoubleWritable(0.01d),
+        new Edge<Text, DoubleWritable>(new Text("CIAO"), new DoubleWritable(1.001d)),
+        new Edge<Text, DoubleWritable>(new Text("TCHAU"), new DoubleWritable(2.0001d)),
+        new Edge<Text, DoubleWritable>(new Text("ADIOS"), new DoubleWritable(3.00001d)));
+
+    assertEquals(vertex.getNumEdges(), 3);
+  }
+
+  @Test
+  public void testDifferentSeparators() throws Exception {
+    String input = "alpha:42:beta:99";
+
+    when(rr.getCurrentValue()).thenReturn(new Text(input));
+    conf.set(AdjacencyListTextVertexInputFormat.LINE_TOKENIZE_VALUE, ":");
+    TextVertexReader vr = createVertexReader(rr);
+
+    vr.initialize(null, tac);
+    assertTrue("Should have been able to read vertex", vr.nextVertex());
+    Vertex<Text, DoubleWritable, DoubleWritable, BooleanWritable> vertex =
+        vr.getCurrentVertex();
+    setGraphState(vertex, graphState);
+    assertValidVertex(conf, graphState, vertex, new Text("alpha"), new DoubleWritable(42d),
+        new Edge<Text, DoubleWritable>(new Text("beta"), new DoubleWritable(99d)));
+    assertEquals(vertex.getNumEdges(), 1);
+  }
+
+  public static class DummyVertex
+      extends EdgeListVertex<Text, DoubleWritable,
+      DoubleWritable, BooleanWritable> {
+    @Override
+    public void compute(Iterable<BooleanWritable> messages) throws IOException {
+      // ignore
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/org/apache/giraph/utils/BspUtilsTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/BspUtilsTest.java b/giraph-core/src/test/java/org/apache/giraph/utils/BspUtilsTest.java
new file mode 100644
index 0000000..68f6553
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/utils/BspUtilsTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Benchmark tests to insure that object creation is fast
+ */
+public class BspUtilsTest {
+  @Rule
+  public TestName name = new TestName();
+  private static final Time TIME = SystemTime.get();
+  private static final long COUNT = 200000;
+  private Configuration conf = new Configuration();
+  private long startNanos = -1;
+  private long totalNanos = -1;
+  private long total = 0;
+  private long expected = COUNT * (COUNT - 1) / 2L;
+  private ImmutableClassesGiraphConfiguration<LongWritable, LongWritable,
+      LongWritable, LongWritable> configuration;
+
+  @Before
+  public void setUp() {
+    conf.setClass(GiraphConstants.VERTEX_ID_CLASS, IntWritable.class,
+        WritableComparable.class);
+    conf.setClass(GiraphConstants.VERTEX_VALUE_CLASS, LongWritable.class,
+        Writable.class);
+    conf.setClass(GiraphConstants.EDGE_VALUE_CLASS, DoubleWritable.class,
+        Writable.class);
+    conf.setClass(GiraphConstants.MESSAGE_VALUE_CLASS, LongWritable.class,
+        Writable.class);
+    GiraphConfiguration conf = new GiraphConfiguration();
+    conf.setVertexClass(ImmutableVertex.class);
+    configuration =
+        new ImmutableClassesGiraphConfiguration<LongWritable, LongWritable,
+            LongWritable, LongWritable>(conf);
+    total = 0;
+    System.gc();
+  }
+
+  @After
+  public void cleanUp() {
+    totalNanos = Times.getNanosSince(TIME, startNanos);
+    System.out.println(name.getMethodName() + ": took "
+        + totalNanos +
+        " ns for " + COUNT + " elements " + (totalNanos * 1f / COUNT) +
+        " ns / element");
+    assertEquals(expected, total);
+    System.gc();
+  }
+
+  @Test
+  public void testCreateClass() {
+    startNanos = TIME.getNanoseconds();
+    for (int i = 0; i < COUNT; ++i) {
+      LongWritable value = BspUtils.createVertexValue(conf);
+      value.set(i);
+      total += value.get();
+    }
+  }
+
+  @Test
+  public void testNativeCreateClass() {
+    startNanos = TIME.getNanoseconds();
+    for (int i = 0; i < COUNT; ++i) {
+      LongWritable value = new LongWritable();
+      value.set(i);
+      total += value.get();
+    }
+  }
+
+  private Class<?> getLongWritableClass() {
+    return LongWritable.class;
+  }
+
+  @Test
+  public void testNewInstance()
+      throws IllegalAccessException, InstantiationException {
+    startNanos = TIME.getNanoseconds();
+    for (int i = 0; i < COUNT; ++i) {
+      LongWritable value = (LongWritable)
+          getLongWritableClass().newInstance();
+      value.set(i);
+      total += value.get();
+    }
+  }
+
+  private synchronized Class<?> getSyncLongWritableClass() {
+    return LongWritable.class;
+  }
+
+  @Test
+  public void testSyncNewInstance()
+      throws IllegalAccessException, InstantiationException {
+    startNanos = TIME.getNanoseconds();
+    for (int i = 0; i < COUNT; ++i) {
+      LongWritable value = (LongWritable)
+          getSyncLongWritableClass().newInstance();
+      value.set(i);
+      total += value.get();
+    }
+  }
+
+  @Test
+  public void testReflectionUtilsNewInstance()
+      throws IllegalAccessException, InstantiationException {
+    // Throwaway to put into cache
+    org.apache.hadoop.util.ReflectionUtils.newInstance(LongWritable.class,
+        null);
+    startNanos = TIME.getNanoseconds();
+    for (int i = 0; i < COUNT; ++i) {
+      LongWritable value = (LongWritable)
+          org.apache.hadoop.util.ReflectionUtils.newInstance(
+              getLongWritableClass(), null);
+      value.set(i);
+      total += value.get();
+    }
+  }
+
+  @Test
+  public void testConstructorNewInstance()
+      throws IllegalAccessException, InstantiationException,
+      NoSuchMethodException, InvocationTargetException {
+    Constructor<?> constructor = LongWritable.class.getDeclaredConstructor
+        (new Class[]{});
+    startNanos = TIME.getNanoseconds();
+    for (int i = 0; i < COUNT; ++i) {
+      LongWritable value = (LongWritable) constructor.newInstance();
+      value.set(i);
+      total += value.get();
+    }
+  }
+
+  private static class ImmutableVertex extends EdgeListVertex<LongWritable,
+      LongWritable, LongWritable, LongWritable> {
+    @Override
+    public void compute(Iterable<LongWritable> messages) throws IOException {
+    }
+  }
+
+  private ImmutableClassesGiraphConfiguration<LongWritable, LongWritable,
+      LongWritable, LongWritable> getConfiguration() {
+    return configuration;
+  }
+
+  @Test
+  public void testImmutableClassesGiraphConfigurationNewInstance() {
+    startNanos = TIME.getNanoseconds();
+    for (int i = 0; i < COUNT; ++i) {
+      LongWritable value = getConfiguration().createVertexValue();
+      value.set(i);
+      total += value.get();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/org/apache/giraph/utils/ComparisonUtilsTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/ComparisonUtilsTest.java b/giraph-core/src/test/java/org/apache/giraph/utils/ComparisonUtilsTest.java
new file mode 100644
index 0000000..789c9ee
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/utils/ComparisonUtilsTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class ComparisonUtilsTest {
+
+    @Test
+    public void testEquality() {
+        Iterable<String> one = Lists.newArrayList("one", "two", "three");
+        Iterable<String> two = Lists.newArrayList("one", "two", "three");
+
+        assertTrue(ComparisonUtils.equal(one, one));
+        assertTrue(ComparisonUtils.equal(one, two));
+        assertTrue(ComparisonUtils.equal(two, two));
+        assertTrue(ComparisonUtils.equal(two, one));
+    }
+
+    @Test
+    public void testEqualityEmpty() {
+        Iterable<String> one = Lists.newArrayList();
+        Iterable<String> two = Lists.newArrayList();
+
+        assertTrue(ComparisonUtils.equal(one, one));
+        assertTrue(ComparisonUtils.equal(one, two));
+        assertTrue(ComparisonUtils.equal(two, two));
+        assertTrue(ComparisonUtils.equal(two, one));
+    }
+
+    @Test
+    public void testInEquality() {
+        Iterable<String> one = Lists.newArrayList("one", "two", "three");
+        Iterable<String> two = Lists.newArrayList("two", "three", "four");
+        Iterable<String> three = Lists.newArrayList();
+
+        assertFalse(ComparisonUtils.equal(one, two));
+        assertFalse(ComparisonUtils.equal(one, three));
+        assertFalse(ComparisonUtils.equal(two, one));
+        assertFalse(ComparisonUtils.equal(two, three));
+        assertFalse(ComparisonUtils.equal(three, one));
+        assertFalse(ComparisonUtils.equal(three, two));
+    }
+
+    @Test
+    public void testInEqualityDifferentLengths() {
+        Iterable<String> one = Lists.newArrayList("one", "two", "three");
+        Iterable<String> two = Lists.newArrayList("one", "two", "three", "four");
+
+        assertFalse(ComparisonUtils.equal(one, two));
+        assertFalse(ComparisonUtils.equal(two, one));
+    }
+
+}


Mime
View raw message