giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject [28/51] [partial] GIRAPH-457: update module names (nitay)
Date Thu, 20 Dec 2012 04:25:32 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/org/apache/giraph/aggregators/TestDoubleAggregators.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/aggregators/TestDoubleAggregators.java b/giraph-core/src/test/java/org/apache/giraph/aggregators/TestDoubleAggregators.java
new file mode 100644
index 0000000..08ffe24
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/aggregators/TestDoubleAggregators.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.aggregators;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.junit.Test;
+
+public class TestDoubleAggregators {
+
+  @Test
+  public void testMaxAggregator() {
+    DoubleMaxAggregator max = new DoubleMaxAggregator();
+    max.aggregate(new DoubleWritable(2.0));
+    max.aggregate(new DoubleWritable(3.0));
+    assertEquals(3.0, max.getAggregatedValue().get(), 0d);
+    max.setAggregatedValue(new DoubleWritable(1.0));
+    assertEquals(1.0, max.getAggregatedValue().get(), 0d);
+    DoubleWritable dw = max.createInitialValue();
+    assertNotNull(dw);
+  }
+
+  @Test
+  public void testMinAggregator() {
+    DoubleMinAggregator min = new DoubleMinAggregator();
+    min.aggregate(new DoubleWritable(3.0));
+    min.aggregate(new DoubleWritable(2.0));
+    assertEquals(2.0, min.getAggregatedValue().get(), 0d);
+    min.setAggregatedValue(new DoubleWritable(3.0));
+    assertEquals(3.0, min.getAggregatedValue().get(), 0d);
+    DoubleWritable dw = min.createInitialValue();
+    assertNotNull(dw);
+  }
+
+  @Test
+  public void testOverwriteAggregator() {
+    DoubleOverwriteAggregator overwrite = new DoubleOverwriteAggregator();
+    overwrite.aggregate(new DoubleWritable(1.0));
+    assertEquals(1.0, overwrite.getAggregatedValue().get(), 0d);
+    overwrite.aggregate(new DoubleWritable(2.0));
+    assertEquals(2.0, overwrite.getAggregatedValue().get(), 0d);
+    overwrite.setAggregatedValue(new DoubleWritable(3.0));
+    assertEquals(3.0, overwrite.getAggregatedValue().get(), 0d);
+    DoubleWritable dw = overwrite.createInitialValue();
+    assertNotNull(dw);
+  }
+
+  @Test
+  public void testProductAggregator() {
+    DoubleProductAggregator product = new DoubleProductAggregator();
+    product.aggregate(new DoubleWritable(6.0));
+    product.aggregate(new DoubleWritable(7.0));
+    assertEquals(42.0, product.getAggregatedValue().get(), 0d);
+    product.setAggregatedValue(new DoubleWritable(1.0));
+    assertEquals(1.0, product.getAggregatedValue().get(), 0d);
+    DoubleWritable dw = product.createInitialValue();
+    assertNotNull(dw);
+  }
+
+  @Test
+  public void testSumAggregator() {
+    DoubleSumAggregator sum = new DoubleSumAggregator();
+    sum.aggregate(new DoubleWritable(1.0));
+    sum.aggregate(new DoubleWritable(2.0));
+    assertEquals(3.0, sum.getAggregatedValue().get(), 0d);
+    sum.setAggregatedValue(new DoubleWritable(4.0));
+    assertEquals(4.0, sum.getAggregatedValue().get(), 0d);
+    DoubleWritable dw = sum.createInitialValue();
+    assertNotNull(dw);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/org/apache/giraph/aggregators/TestFloatAggregators.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/aggregators/TestFloatAggregators.java b/giraph-core/src/test/java/org/apache/giraph/aggregators/TestFloatAggregators.java
new file mode 100644
index 0000000..214cd62
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/aggregators/TestFloatAggregators.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0f (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0f
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.aggregators;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.junit.Test;
+
+public class TestFloatAggregators {
+
+  @Test
+  public void testMaxAggregator() {
+    FloatMaxAggregator max = new FloatMaxAggregator();
+    max.aggregate(new FloatWritable(2.0f));
+    max.aggregate(new FloatWritable(3.0f));
+    assertEquals(3.0f, max.getAggregatedValue().get(), 0f);
+    max.setAggregatedValue(new FloatWritable(1.0f));
+    assertEquals(1.0f, max.getAggregatedValue().get(), 0f);
+    FloatWritable fw = max.createInitialValue();
+    assertNotNull(fw);
+  }
+
+  @Test
+  public void testMinAggregator() {
+    FloatMinAggregator min = new FloatMinAggregator();
+    min.aggregate(new FloatWritable(3.0f));
+    min.aggregate(new FloatWritable(2.0f));
+    assertEquals(2.0f, min.getAggregatedValue().get(), 0f);
+    min.setAggregatedValue(new FloatWritable(3.0f));
+    assertEquals(3.0f, min.getAggregatedValue().get(), 0f);
+    FloatWritable fw = min.createInitialValue();
+    assertNotNull(fw);
+  }
+
+  @Test
+  public void testOverwriteAggregator() {
+    FloatOverwriteAggregator overwrite = new FloatOverwriteAggregator();
+    overwrite.aggregate(new FloatWritable(1.0f));
+    assertEquals(1.0f, overwrite.getAggregatedValue().get(), 0f);
+    overwrite.aggregate(new FloatWritable(2.0f));
+    assertEquals(2.0f, overwrite.getAggregatedValue().get(), 0f);
+    overwrite.setAggregatedValue(new FloatWritable(3.0f));
+    assertEquals(3.0f, overwrite.getAggregatedValue().get(), 0f);
+    FloatWritable fw = overwrite.createInitialValue();
+    assertNotNull(fw);
+  }
+
+  @Test
+  public void testProductAggregator() {
+    FloatProductAggregator product = new FloatProductAggregator();
+    product.aggregate(new FloatWritable(6.0f));
+    product.aggregate(new FloatWritable(7.0f));
+    assertEquals(42.0f, product.getAggregatedValue().get(), 0f);
+    product.setAggregatedValue(new FloatWritable(1.0f));
+    assertEquals(1.0f, product.getAggregatedValue().get(), 0f);
+    FloatWritable fw = product.createInitialValue();
+    assertNotNull(fw);
+  }
+
+  @Test
+  public void testSumAggregator() {
+    FloatSumAggregator sum = new FloatSumAggregator();
+    sum.aggregate(new FloatWritable(1.0f));
+    sum.aggregate(new FloatWritable(2.0f));
+    assertEquals(3.0f, sum.getAggregatedValue().get(), 0f);
+    sum.setAggregatedValue(new FloatWritable(4.0f));
+    assertEquals(4.0f, sum.getAggregatedValue().get(), 0f);
+    FloatWritable fw = sum.createInitialValue();
+    assertNotNull(fw);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/org/apache/giraph/aggregators/TestIntAggregators.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/aggregators/TestIntAggregators.java b/giraph-core/src/test/java/org/apache/giraph/aggregators/TestIntAggregators.java
new file mode 100644
index 0000000..5ddd220
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/aggregators/TestIntAggregators.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.aggregators;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.hadoop.io.IntWritable;
+import org.junit.Test;
+
+public class TestIntAggregators {
+
+  @Test
+  public void testMaxAggregator() {
+    IntMaxAggregator max = new IntMaxAggregator();
+    max.aggregate(new IntWritable(2));
+    max.aggregate(new IntWritable(3));
+    assertEquals(3, max.getAggregatedValue().get());
+    max.setAggregatedValue(new IntWritable(1));
+    assertEquals(1, max.getAggregatedValue().get());
+    IntWritable iw = max.createInitialValue();
+    assertNotNull(iw);
+  }
+
+  @Test
+  public void testMinAggregator() {
+    IntMinAggregator min = new IntMinAggregator();
+    min.aggregate(new IntWritable(3));
+    min.aggregate(new IntWritable(2));
+    assertEquals(2, min.getAggregatedValue().get());
+    min.setAggregatedValue(new IntWritable(3));
+    assertEquals(3, min.getAggregatedValue().get());
+    IntWritable iw = min.createInitialValue();
+    assertNotNull(iw);
+  }
+
+  @Test
+  public void testOverwriteAggregator() {
+    IntOverwriteAggregator overwrite = new IntOverwriteAggregator();
+    overwrite.aggregate(new IntWritable(1));
+    assertEquals(1, overwrite.getAggregatedValue().get());
+    overwrite.aggregate(new IntWritable(2));
+    assertEquals(2, overwrite.getAggregatedValue().get());
+    overwrite.setAggregatedValue(new IntWritable(3));
+    assertEquals(3, overwrite.getAggregatedValue().get());
+    IntWritable iw = overwrite.createInitialValue();
+    assertNotNull(iw);
+  }
+  
+  @Test
+  public void testProductAggregator() {
+    IntProductAggregator product = new IntProductAggregator();
+    product.aggregate(new IntWritable(6));
+    product.aggregate(new IntWritable(7));
+    assertEquals(42, product.getAggregatedValue().get());
+    product.setAggregatedValue(new IntWritable(1));
+    assertEquals(1, product.getAggregatedValue().get());
+    IntWritable iw = product.createInitialValue();
+    assertNotNull(iw);
+  }
+
+  @Test
+  public void testSumAggregator() {
+    IntSumAggregator sum = new IntSumAggregator();
+    sum.aggregate(new IntWritable(1));
+    sum.aggregate(new IntWritable(2));
+    assertEquals(3, sum.getAggregatedValue().get());
+    sum.setAggregatedValue(new IntWritable(4));
+    assertEquals(4, sum.getAggregatedValue().get());
+    IntWritable iw = sum.createInitialValue();
+    assertNotNull(iw);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/org/apache/giraph/aggregators/TestLongAggregators.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/aggregators/TestLongAggregators.java b/giraph-core/src/test/java/org/apache/giraph/aggregators/TestLongAggregators.java
new file mode 100644
index 0000000..4838ced
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/aggregators/TestLongAggregators.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.aggregators;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Test;
+
+public class TestLongAggregators {
+
+  @Test
+  public void testMaxAggregator() {
+    LongMaxAggregator max = new LongMaxAggregator();
+    max.aggregate(new LongWritable(2L));
+    max.aggregate(new LongWritable(3L));
+    assertEquals(3L, max.getAggregatedValue().get());
+    max.setAggregatedValue(new LongWritable(1L));
+    assertEquals(1L, max.getAggregatedValue().get());
+    LongWritable lw = max.createInitialValue();
+    assertNotNull(lw);
+  }
+
+  @Test
+  public void testMinAggregator() {
+    LongMinAggregator min = new LongMinAggregator();
+    min.aggregate(new LongWritable(3L));
+    min.aggregate(new LongWritable(2L));
+    assertEquals(2L, min.getAggregatedValue().get());
+    min.setAggregatedValue(new LongWritable(3L));
+    assertEquals(3L, min.getAggregatedValue().get());
+    LongWritable lw = min.createInitialValue();
+    assertNotNull(lw);
+  }
+
+  @Test
+  public void testOverwriteAggregator() {
+    LongOverwriteAggregator overwrite = new LongOverwriteAggregator();
+    overwrite.aggregate(new LongWritable(1L));
+    assertEquals(1L, overwrite.getAggregatedValue().get());
+    overwrite.aggregate(new LongWritable(2L));
+    assertEquals(2L, overwrite.getAggregatedValue().get());
+    overwrite.setAggregatedValue(new LongWritable(3L));
+    assertEquals(3L, overwrite.getAggregatedValue().get());
+    LongWritable lw = overwrite.createInitialValue();
+    assertNotNull(lw);
+  }
+  
+  @Test
+  public void testProductAggregator() {
+    LongProductAggregator product = new LongProductAggregator();
+    product.aggregate(new LongWritable(6L));
+    product.aggregate(new LongWritable(7L));
+    assertEquals(42L, product.getAggregatedValue().get());
+    product.setAggregatedValue(new LongWritable(1L));
+    assertEquals(1L, product.getAggregatedValue().get());
+    LongWritable lw = product.createInitialValue();
+    assertNotNull(lw);
+  }
+
+  @Test
+  public void testSumAggregator() {
+    LongSumAggregator sum = new LongSumAggregator();
+    sum.aggregate(new LongWritable(1L));
+    sum.aggregate(new LongWritable(2L));
+    assertEquals(3L, sum.getAggregatedValue().get());
+    sum.setAggregatedValue(new LongWritable(4L));
+    assertEquals(4L, sum.getAggregatedValue().get());
+    LongWritable lw = sum.createInitialValue();
+    assertNotNull(lw);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java
new file mode 100644
index 0000000..14e590c
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.comm.netty.handler.RequestServerHandler;
+import org.apache.giraph.comm.netty.NettyClient;
+import org.apache.giraph.comm.netty.NettyServer;
+import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.giraph.utils.MockUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+/**
+ * Test the netty connections
+ */
+public class ConnectionTest {
+  /** Class configuration */
+  private ImmutableClassesGiraphConfiguration conf;
+
+  public static class IntVertex extends EdgeListVertex<IntWritable,
+          IntWritable, IntWritable, IntWritable> {
+    @Override
+    public void compute(Iterable<IntWritable> messages) throws IOException {
+    }
+  }
+
+  @Before
+  public void setUp() {
+    GiraphConfiguration tmpConfig = new GiraphConfiguration();
+    tmpConfig.setVertexClass(IntVertex.class);
+    conf = new ImmutableClassesGiraphConfiguration(tmpConfig);
+  }
+
+  /**
+   * Test connecting a single client to a single server.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void connectSingleClientServer() throws IOException {
+    @SuppressWarnings("rawtypes")
+    Context context = mock(Context.class);
+    when(context.getConfiguration()).thenReturn(conf);
+
+    ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
+        MockUtils.createNewServerData(conf, context);
+    WorkerInfo workerInfo = new WorkerInfo();
+    NettyServer server =
+        new NettyServer(conf,
+            new WorkerRequestServerHandler.Factory(serverData), workerInfo,
+            context);
+    server.start();
+    workerInfo.setInetSocketAddress(server.getMyAddress());
+
+    NettyClient client = new NettyClient(context, conf, new WorkerInfo());
+    client.connectAllAddresses(
+        Lists.<WorkerInfo>newArrayList(workerInfo));
+
+    client.stop();
+    server.stop();
+  }
+
+  /**
+   * Test connecting one client to three servers.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void connectOneClientToThreeServers() throws IOException {
+    @SuppressWarnings("rawtypes")
+    Context context = mock(Context.class);
+    when(context.getConfiguration()).thenReturn(conf);
+
+    ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
+        MockUtils.createNewServerData(conf, context);
+   RequestServerHandler.Factory requestServerHandlerFactory =
+       new WorkerRequestServerHandler.Factory(serverData);
+
+    WorkerInfo workerInfo1 = new WorkerInfo();
+    workerInfo1.setTaskId(1);
+    NettyServer server1 =
+        new NettyServer(conf, requestServerHandlerFactory, workerInfo1, context);
+    server1.start();
+    workerInfo1.setInetSocketAddress(server1.getMyAddress());
+
+    WorkerInfo workerInfo2 = new WorkerInfo();
+    workerInfo1.setTaskId(2);
+    NettyServer server2 =
+        new NettyServer(conf, requestServerHandlerFactory, workerInfo2,
+            context);
+    server2.start();
+    workerInfo2.setInetSocketAddress(server2.getMyAddress());
+
+    WorkerInfo workerInfo3 = new WorkerInfo();
+    workerInfo1.setTaskId(3);
+    NettyServer server3 =
+        new NettyServer(conf, requestServerHandlerFactory, workerInfo3,
+            context);
+    server3.start();
+    workerInfo3.setInetSocketAddress(server3.getMyAddress());
+
+    NettyClient client = new NettyClient(context, conf, new WorkerInfo());
+    List<WorkerInfo> addresses = Lists.<WorkerInfo>newArrayList(workerInfo1,
+        workerInfo2, workerInfo3);
+    client.connectAllAddresses(addresses);
+
+    client.stop();
+    server1.stop();
+    server2.stop();
+    server3.stop();
+  }
+
+  /**
+   * Test connecting three clients to one server.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void connectThreeClientsToOneServer() throws IOException {
+    @SuppressWarnings("rawtypes")
+    Context context = mock(Context.class);
+    when(context.getConfiguration()).thenReturn(conf);
+
+    ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
+        MockUtils.createNewServerData(conf, context);
+    WorkerInfo workerInfo = new WorkerInfo();
+    NettyServer server = new NettyServer(conf,
+        new WorkerRequestServerHandler.Factory(serverData), workerInfo,
+            context);
+    server.start();
+    workerInfo.setInetSocketAddress(server.getMyAddress());
+
+    List<WorkerInfo> addresses = Lists.<WorkerInfo>newArrayList(workerInfo);
+    NettyClient client1 = new NettyClient(context, conf, new WorkerInfo());
+    client1.connectAllAddresses(addresses);
+    NettyClient client2 = new NettyClient(context, conf, new WorkerInfo());
+    client2.connectAllAddresses(addresses);
+    NettyClient client3 = new NettyClient(context, conf, new WorkerInfo());
+    client3.connectAllAddresses(addresses);
+
+    client1.stop();
+    client2.stop();
+    client3.stop();
+    server.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/org/apache/giraph/comm/IncreasingBitSetTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/IncreasingBitSetTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/IncreasingBitSetTest.java
new file mode 100644
index 0000000..f296204
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/IncreasingBitSetTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import org.apache.giraph.utils.IncreasingBitSet;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test IncreasingBitSetTest
+ */
+public class IncreasingBitSetTest {
+  @Test
+  public void add256kIntegers() {
+    IncreasingBitSet IncreasingBitSet = new IncreasingBitSet();
+    for (int i = 0; i < 256 * 1024; ++i) {
+      assertFalse(IncreasingBitSet.has(i));
+      assertTrue(IncreasingBitSet.add(i));
+      assertTrue(IncreasingBitSet.has(i));
+      assertTrue(IncreasingBitSet.size() <=
+          IncreasingBitSet.MIN_BITS_TO_SHIFT);
+    }
+    assertEquals(256 * 1024L, IncreasingBitSet.getLastBaseKey());
+  }
+
+  @Test
+  public void add256kIntegersAlternate() {
+    IncreasingBitSet IncreasingBitSet = new IncreasingBitSet();
+    for (int i = 0; i < 256 * 1024; i += 2) {
+      assertFalse(IncreasingBitSet.has(i));
+      assertTrue(IncreasingBitSet.add(i));
+      assertTrue(IncreasingBitSet.has(i));
+      assertFalse(IncreasingBitSet.has(i + 1));
+      assertTrue(IncreasingBitSet.size() <= 256 * 1024);
+    }
+    assertEquals(128 * 1024L, IncreasingBitSet.cardinality());
+    for (int i = 1; i < 256 * 1024; i += 2) {
+      assertFalse(IncreasingBitSet.has(i));
+      assertTrue(IncreasingBitSet.add(i));
+      assertTrue(IncreasingBitSet.has(i));
+      assertTrue(IncreasingBitSet.has(i - 1));
+      assertTrue(IncreasingBitSet.size() <= 256 * 1024);
+    }
+    assertEquals(256 * 1024L, IncreasingBitSet.cardinality());
+  }
+
+  @Test
+  public void add256kIntegersOutOfOrder() {
+    IncreasingBitSet IncreasingBitSet = new IncreasingBitSet();
+    for (int i = 128 * 1024; i < 256 * 1024; ++i) {
+      assertFalse(IncreasingBitSet.has(i));
+      assertTrue(IncreasingBitSet.add(i));
+      assertTrue(IncreasingBitSet.has(i));
+      assertTrue(IncreasingBitSet.size() <= 512 * 1024);
+    }
+    assertEquals(128 * 1024L, IncreasingBitSet.cardinality());
+    for (int i = 0; i < 128 * 1024; ++i) {
+      assertFalse(IncreasingBitSet.has(i));
+      assertTrue(IncreasingBitSet.add(i));
+      assertTrue(IncreasingBitSet.has(i));
+      assertTrue(IncreasingBitSet.size() <= 512 * 1024);
+    }
+    assertEquals(256 * 1024L, IncreasingBitSet.cardinality());
+    assertEquals(256 * 1024L, IncreasingBitSet.getLastBaseKey());
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
new file mode 100644
index 0000000..4b41f63
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import org.apache.giraph.comm.netty.NettyClient;
+import org.apache.giraph.comm.netty.NettyServer;
+import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
+import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
+import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.MockUtils;
+import org.apache.giraph.utils.PairList;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test all the netty failure scenarios
+ */
+public class RequestFailureTest {
+  /** Configuration */
+  private ImmutableClassesGiraphConfiguration conf;
+  /** Server data */
+  private ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
+  serverData;
+  /** Server */
+  private NettyServer server;
+  /** Client */
+  private NettyClient client;
+  /** Mock context */
+  private Context context;
+
+  /**
+   * Only for testing.
+   */
+  public static class TestVertex extends EdgeListVertex<IntWritable,
+      IntWritable, IntWritable, IntWritable> {
+    @Override
+    public void compute(Iterable<IntWritable> messages) throws IOException {
+    }
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    // Setup the conf
+    GiraphConfiguration tmpConf = new GiraphConfiguration();
+    tmpConf.setVertexClass(TestVertex.class);
+    conf = new ImmutableClassesGiraphConfiguration(tmpConf);
+
+    context = mock(Context.class);
+    when(context.getConfiguration()).thenReturn(conf);
+  }
+
+  private WritableRequest getRequest() {
+    // Data to send
+    final int partitionId = 0;
+    PairList<Integer, ByteArrayVertexIdMessages<IntWritable,
+                IntWritable>>
+        dataToSend = new PairList<Integer,
+        ByteArrayVertexIdMessages<IntWritable, IntWritable>>();
+    dataToSend.initialize();
+    ByteArrayVertexIdMessages<IntWritable,
+            IntWritable> vertexIdMessages =
+        new ByteArrayVertexIdMessages<IntWritable, IntWritable>();
+    vertexIdMessages.setConf(conf);
+    vertexIdMessages.initialize();
+    dataToSend.add(partitionId, vertexIdMessages);
+    for (int i = 1; i < 7; ++i) {
+      IntWritable vertexId = new IntWritable(i);
+      for (int j = 0; j < i; ++j) {
+        vertexIdMessages.add(vertexId, new IntWritable(j));
+      }
+    }
+
+    // Send the request
+    SendWorkerMessagesRequest<IntWritable, IntWritable, IntWritable,
+            IntWritable> request =
+        new SendWorkerMessagesRequest<IntWritable, IntWritable,
+                    IntWritable, IntWritable>(dataToSend);
+    return request;
+  }
+
+  private void checkResult(int numRequests) throws IOException {
+    // Check the output
+    Iterable<IntWritable> vertices =
+        serverData.getIncomingMessageStore().getDestinationVertices();
+    int keySum = 0;
+    int messageSum = 0;
+    for (IntWritable vertexId : vertices) {
+      keySum += vertexId.get();
+      Iterable<IntWritable> messages =
+          serverData.getIncomingMessageStore().getVertexMessages(vertexId);
+      synchronized (messages) {
+        for (IntWritable message : messages) {
+          messageSum += message.get();
+        }
+      }
+    }
+    assertEquals(21, keySum);
+    assertEquals(35 * numRequests, messageSum);
+  }
+
+  @Test
+  public void send2Requests() throws IOException {
+    checkSendingTwoRequests();
+  }
+
+  @Test
+  public void alreadyProcessedRequest() throws IOException {
+    // Force a drop of the first request
+    conf.setBoolean(GiraphConstants.NETTY_SIMULATE_FIRST_RESPONSE_FAILED, true);
+    // One second to finish a request
+    conf.setInt(GiraphConstants.MAX_REQUEST_MILLISECONDS, 1000);
+    // Loop every 2 seconds
+    conf.setInt(GiraphConstants.WAITING_REQUEST_MSECS, 2000);
+
+    checkSendingTwoRequests();
+  }
+
+  @Test
+  public void resendRequest() throws IOException {
+    // Force a drop of the first request
+    conf.setBoolean(GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED, true);
+    // One second to finish a request
+    conf.setInt(GiraphConstants.MAX_REQUEST_MILLISECONDS, 1000);
+    // Loop every 2 seconds
+    conf.setInt(GiraphConstants.WAITING_REQUEST_MSECS, 2000);
+
+    checkSendingTwoRequests();
+  }
+
+  private void checkSendingTwoRequests() throws IOException {
+    // Start the service
+    serverData = MockUtils.createNewServerData(conf, context);
+    WorkerInfo workerInfo = new WorkerInfo();
+    server = new NettyServer(conf,
+        new WorkerRequestServerHandler.Factory(serverData), workerInfo,
+            context);
+    server.start();
+    workerInfo.setInetSocketAddress(server.getMyAddress());
+    client = new NettyClient(context, conf, new WorkerInfo());
+    client.connectAllAddresses(
+        Lists.<WorkerInfo>newArrayList(workerInfo));
+
+    // Send the request 2x, but should only be processed once
+    WritableRequest request1 = getRequest();
+    WritableRequest request2 = getRequest();
+    client.sendWritableRequest(workerInfo.getTaskId(), request1);
+    client.sendWritableRequest(workerInfo.getTaskId(), request2);
+    client.waitAllRequests();
+
+    // Stop the service
+    client.stop();
+    server.stop();
+
+    // Check the output (should have been only processed once)
+    checkResult(2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
new file mode 100644
index 0000000..d3bf7c3
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import org.apache.giraph.comm.netty.NettyClient;
+import org.apache.giraph.comm.netty.NettyServer;
+import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
+import org.apache.giraph.comm.requests.SendPartitionMutationsRequest;
+import org.apache.giraph.comm.requests.SendVertexRequest;
+import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexMutations;
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.giraph.graph.partition.Partition;
+import org.apache.giraph.graph.partition.PartitionStore;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.MockUtils;
+import org.apache.giraph.utils.PairList;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test all the different netty requests.
+ */
+public class RequestTest {
+  /** Configuration */
+  private ImmutableClassesGiraphConfiguration conf;
+  /** Server data */
+  private ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
+  serverData;
+  /** Server */
+  private NettyServer server;
+  /** Client */
+  private NettyClient client;
+  /** Worker info */
+  private WorkerInfo workerInfo;
+
+  /**
+   * Only for testing.
+   */
+  public static class TestVertex extends EdgeListVertex<IntWritable,
+      IntWritable, IntWritable, IntWritable> {
+    @Override
+    public void compute(Iterable<IntWritable> messages) throws IOException {
+    }
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    // Setup the conf
+    GiraphConfiguration tmpConf = new GiraphConfiguration();
+    tmpConf.setClass(GiraphConstants.VERTEX_CLASS, TestVertex.class,
+        Vertex.class);
+    conf = new ImmutableClassesGiraphConfiguration(tmpConf);
+
+    @SuppressWarnings("rawtypes")
+    Context context = mock(Context.class);
+    when(context.getConfiguration()).thenReturn(conf);
+
+    // Start the service
+    serverData = MockUtils.createNewServerData(conf, context);
+    workerInfo = new WorkerInfo();
+    server = new NettyServer(conf,
+        new WorkerRequestServerHandler.Factory(serverData), workerInfo,
+            context);
+    server.start();
+    workerInfo.setInetSocketAddress(server.getMyAddress());
+    client = new NettyClient(context, conf, new WorkerInfo());
+    client.connectAllAddresses(
+        Lists.<WorkerInfo>newArrayList(workerInfo));
+  }
+
+  @Test
+  public void sendVertexPartition() throws IOException {
+    // Data to send
+    int partitionId = 13;
+    Partition<IntWritable, IntWritable, IntWritable, IntWritable> partition =
+        conf.createPartition(partitionId, null);
+    for (int i = 0; i < 10; ++i) {
+      TestVertex vertex = new TestVertex();
+      vertex.initialize(new IntWritable(i), new IntWritable(i));
+      partition.putVertex(vertex);
+    }
+
+    // Send the request
+    SendVertexRequest<IntWritable, IntWritable, IntWritable,
+    IntWritable> request =
+      new SendVertexRequest<IntWritable, IntWritable,
+      IntWritable, IntWritable>(partition);
+    client.sendWritableRequest(workerInfo.getTaskId(), request);
+    client.waitAllRequests();
+
+    // Stop the service
+    client.stop();
+    server.stop();
+
+    // Check the output
+    PartitionStore<IntWritable, IntWritable,
+        IntWritable, IntWritable> partitionStore =
+        serverData.getPartitionStore();
+    assertTrue(partitionStore.hasPartition(partitionId));
+    int total = 0;
+    for (Vertex<IntWritable, IntWritable,
+        IntWritable, IntWritable> vertex :
+        partitionStore.getPartition(partitionId)) {
+      total += vertex.getId().get();
+    }
+    assertEquals(total, 45);
+  }
+
+  @Test
+  public void sendWorkerMessagesRequest() throws IOException {
+    // Data to send
+    PairList<Integer, ByteArrayVertexIdMessages<IntWritable,
+            IntWritable>>
+        dataToSend = new PairList<Integer,
+        ByteArrayVertexIdMessages<IntWritable, IntWritable>>();
+    dataToSend.initialize();
+    int partitionId = 0;
+    ByteArrayVertexIdMessages<IntWritable,
+            IntWritable> vertexIdMessages =
+        new ByteArrayVertexIdMessages<IntWritable, IntWritable>();
+    vertexIdMessages.setConf(conf);
+    vertexIdMessages.initialize();
+    dataToSend.add(partitionId, vertexIdMessages);
+    for (int i = 1; i < 7; ++i) {
+      IntWritable vertexId = new IntWritable(i);
+      for (int j = 0; j < i; ++j) {
+        vertexIdMessages.add(vertexId, new IntWritable(j));
+      }
+    }
+
+    // Send the request
+    SendWorkerMessagesRequest<IntWritable, IntWritable, IntWritable,
+        IntWritable> request =
+      new SendWorkerMessagesRequest<IntWritable, IntWritable,
+            IntWritable, IntWritable>(dataToSend);
+    client.sendWritableRequest(workerInfo.getTaskId(), request);
+    client.waitAllRequests();
+
+    // Stop the service
+    client.stop();
+    server.stop();
+
+    // Check the output
+    Iterable<IntWritable> vertices =
+        serverData.getIncomingMessageStore().getDestinationVertices();
+    int keySum = 0;
+    int messageSum = 0;
+    for (IntWritable vertexId : vertices) {
+      keySum += vertexId.get();
+      Iterable<IntWritable> messages =
+          serverData.getIncomingMessageStore().getVertexMessages(vertexId);
+      synchronized (messages) {
+        for (IntWritable message : messages) {
+          messageSum += message.get();
+        }
+      }
+    }
+    assertEquals(21, keySum);
+    assertEquals(35, messageSum);
+  }
+
+  @Test
+  public void sendPartitionMutationsRequest() throws IOException {
+    // Data to send
+    int partitionId = 19;
+    Map<IntWritable, VertexMutations<IntWritable, IntWritable,
+    IntWritable, IntWritable>> vertexIdMutations =
+        Maps.newHashMap();
+    for (int i = 0; i < 11; ++i) {
+      VertexMutations<IntWritable, IntWritable, IntWritable, IntWritable>
+      mutations =
+      new VertexMutations<IntWritable, IntWritable,
+      IntWritable, IntWritable>();
+      for (int j = 0; j < 3; ++j) {
+        TestVertex vertex = new TestVertex();
+        vertex.initialize(new IntWritable(i), new IntWritable(j));
+        mutations.addVertex(vertex);
+      }
+      for (int j = 0; j < 2; ++j) {
+        mutations.removeVertex();
+      }
+      for (int j = 0; j < 5; ++j) {
+        Edge<IntWritable, IntWritable> edge =
+            new Edge<IntWritable, IntWritable>(
+                new IntWritable(i), new IntWritable(2*j));
+        mutations.addEdge(edge);
+      }
+      for (int j = 0; j < 7; ++j) {
+        mutations.removeEdge(new IntWritable(j));
+      }
+      vertexIdMutations.put(new IntWritable(i), mutations);
+    }
+
+    // Send the request
+    SendPartitionMutationsRequest<IntWritable, IntWritable, IntWritable,
+    IntWritable> request =
+      new SendPartitionMutationsRequest<IntWritable, IntWritable,
+      IntWritable, IntWritable>(partitionId, vertexIdMutations);
+    client.sendWritableRequest(workerInfo.getTaskId(), request);
+    client.waitAllRequests();
+
+    // Stop the service
+    client.stop();
+    server.stop();
+
+    // Check the output
+    ConcurrentHashMap<IntWritable, VertexMutations<IntWritable, IntWritable,
+    IntWritable, IntWritable>> inVertexIdMutations =
+        serverData.getVertexMutations();
+    int keySum = 0;
+    for (Entry<IntWritable, VertexMutations<IntWritable, IntWritable,
+        IntWritable, IntWritable>> entry :
+          inVertexIdMutations.entrySet()) {
+      synchronized (entry.getValue()) {
+        keySum += entry.getKey().get();
+        int vertexValueSum = 0;
+        for (Vertex<IntWritable, IntWritable, IntWritable, IntWritable>
+        vertex : entry.getValue().getAddedVertexList()) {
+          vertexValueSum += vertex.getValue().get();
+        }
+        assertEquals(3, vertexValueSum);
+        assertEquals(2, entry.getValue().getRemovedVertexCount());
+        int removeEdgeValueSum = 0;
+        for (Edge<IntWritable, IntWritable> edge :
+          entry.getValue().getAddedEdgeList()) {
+          removeEdgeValueSum += edge.getValue().get();
+        }
+        assertEquals(20, removeEdgeValueSum);
+      }
+    }
+    assertEquals(55, keySum);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
new file mode 100644
index 0000000..0f9c64e
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import org.apache.giraph.comm.netty.NettyClient;
+import org.apache.giraph.comm.netty.NettyServer;
+import org.apache.giraph.comm.netty.handler.SaslServerHandler;
+import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.giraph.utils.MockUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Netty connection with mocked authentication.
+ */
+public class SaslConnectionTest {
+  /** Class configuration */
+  private ImmutableClassesGiraphConfiguration conf;
+
+  public static class IntVertex extends EdgeListVertex<IntWritable,
+        IntWritable, IntWritable, IntWritable> {
+    @Override
+    public void compute(Iterable<IntWritable> messages) throws IOException {
+    }
+  }
+
+  @Before
+  public void setUp() {
+    GiraphConfiguration tmpConfig = new GiraphConfiguration();
+    tmpConfig.setVertexClass(IntVertex.class);
+    tmpConfig.setBoolean(GiraphConstants.AUTHENTICATE, true);
+    conf = new ImmutableClassesGiraphConfiguration(tmpConfig);
+  }
+
+  /**
+   * Test connecting a single client to a single server.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void connectSingleClientServer() throws IOException {
+    @SuppressWarnings("rawtypes")
+    Context context = mock(Context.class);
+    when(context.getConfiguration()).thenReturn(conf);
+
+    ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
+        MockUtils.createNewServerData(conf, context);
+
+    SaslServerHandler.Factory mockedSaslServerFactory =
+        Mockito.mock(SaslServerHandler.Factory.class);
+
+    SaslServerHandler mockedSaslServerHandler =
+        Mockito.mock(SaslServerHandler.class);
+    when(mockedSaslServerFactory.newHandler(conf)).
+        thenReturn(mockedSaslServerHandler);
+
+    WorkerInfo workerInfo = new WorkerInfo();
+    workerInfo.setTaskId(-1);
+    NettyServer server =
+        new NettyServer(conf,
+            new WorkerRequestServerHandler.Factory(serverData),
+            workerInfo,
+            context,
+            mockedSaslServerFactory);
+    server.start();
+    workerInfo.setInetSocketAddress(server.getMyAddress());
+
+    NettyClient client = new NettyClient(context, conf, new WorkerInfo());
+    client.connectAllAddresses(Lists.<WorkerInfo>newArrayList(workerInfo));
+
+    client.stop();
+    server.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
new file mode 100644
index 0000000..c6da853
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.messages.BasicMessageStore;
+import org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore;
+import org.apache.giraph.comm.messages.DiskBackedMessageStore;
+import org.apache.giraph.comm.messages.DiskBackedMessageStoreByPartition;
+import org.apache.giraph.comm.messages.FlushableMessageStore;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.comm.messages.SequentialFileMessageStore;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.CollectionUtils;
+import org.apache.giraph.utils.MockUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/** Test for different types of message stores */
+public class TestMessageStores {
+  private static String directory;
+  private static ImmutableClassesGiraphConfiguration config;
+  private static TestData testData;
+  private static
+  CentralizedServiceWorker<IntWritable, IntWritable, IntWritable, IntWritable>
+      service;
+  /**
+   * Pseudo-random number generator with the same seed to help with
+   * debugging)
+   */
+  private static final Random RANDOM = new Random(101);
+
+  private static class IntVertex extends EdgeListVertex<IntWritable,
+      IntWritable, IntWritable, IntWritable> {
+
+    @Override
+    public void compute(Iterable<IntWritable> messages) throws IOException {
+    }
+  }
+
+  @Before
+  public void prepare() {
+    directory = "test/";
+
+    Configuration.addDefaultResource("giraph-site.xml");
+    GiraphConfiguration initConfig = new GiraphConfiguration();
+    initConfig.setVertexClass(IntVertex.class);
+    config = new ImmutableClassesGiraphConfiguration(initConfig);
+
+    testData = new TestData();
+    testData.maxId = 1000000;
+    testData.maxMessage = 1000000;
+    testData.maxNumberOfMessages = 100;
+    testData.numVertices = 50;
+    testData.numTimes = 10;
+    testData.numOfPartitions = 5;
+    testData.maxMessagesInMemory = 20;
+
+    service =
+        MockUtils.mockServiceGetVertexPartitionOwner(testData.numOfPartitions);
+
+    new File(directory).mkdir();
+  }
+
+  @After
+  public void cleanUp() {
+    new File(directory).delete();
+  }
+
+  private static class TestData {
+    int numTimes;
+    int numVertices;
+    int maxNumberOfMessages;
+    int maxId;
+    int maxMessage;
+    int numOfPartitions;
+    int maxMessagesInMemory;
+  }
+
+  private SortedMap<IntWritable, Collection<IntWritable>> createRandomMessages(
+      TestData testData) {
+    SortedMap<IntWritable, Collection<IntWritable>> allMessages =
+        new TreeMap<IntWritable, Collection<IntWritable>>();
+    for (int v = 0; v < testData.numVertices; v++) {
+      int messageNum =
+          (int) (RANDOM.nextFloat() * testData.maxNumberOfMessages);
+      Collection<IntWritable> vertexMessages = Lists.newArrayList();
+      for (int m = 0; m < messageNum; m++) {
+        vertexMessages.add(
+            new IntWritable((int) (RANDOM.nextFloat() * testData.maxMessage)));
+      }
+      IntWritable vertexId =
+          new IntWritable((int) (RANDOM.nextFloat() * testData.maxId));
+      allMessages.put(vertexId, vertexMessages);
+    }
+    return allMessages;
+  }
+
+  /**
+   * Used for testing only
+   */
+  private static class InputMessageStore extends
+      ByteArrayMessagesPerVertexStore<IntWritable, IntWritable> {
+
+    /**
+     * Constructor
+     *
+     * @param service Service worker
+     * @param config  Hadoop configuration
+     */
+    InputMessageStore(
+        CentralizedServiceWorker<IntWritable, ?, ?, IntWritable> service,
+        ImmutableClassesGiraphConfiguration<IntWritable, ?, ?,
+            IntWritable> config,
+        Map<IntWritable, Collection<IntWritable>> inputMap) throws IOException {
+      super(service, config);
+      // Adds all the messages to the store
+      for (Map.Entry<IntWritable, Collection<IntWritable>> entry :
+          inputMap.entrySet()) {
+        int partitionId = getPartitionId(entry.getKey());
+        ByteArrayVertexIdMessages<IntWritable, IntWritable>
+            byteArrayVertexIdMessages =
+            new ByteArrayVertexIdMessages<IntWritable, IntWritable>();
+        byteArrayVertexIdMessages.setConf(config);
+        byteArrayVertexIdMessages.initialize();
+        for (IntWritable message : entry.getValue()) {
+          byteArrayVertexIdMessages.add(entry.getKey(), message);
+        }
+        try {
+          addPartitionMessages(partitionId, byteArrayVertexIdMessages);
+        } catch (IOException e) {
+          throw new IllegalStateException("Got IOException", e);
+        }
+      }
+    }
+  }
+
+  private void putNTimes(
+      MessageStore<IntWritable, IntWritable> messageStore,
+      Map<IntWritable, Collection<IntWritable>> messages,
+      TestData testData) throws IOException {
+    for (int n = 0; n < testData.numTimes; n++) {
+      SortedMap<IntWritable, Collection<IntWritable>> batch =
+          createRandomMessages(testData);
+      messageStore.addMessages(new InputMessageStore(service, config,
+          batch));
+      for (Entry<IntWritable, Collection<IntWritable>> entry :
+          batch.entrySet()) {
+        if (messages.containsKey(entry.getKey())) {
+          messages.get(entry.getKey()).addAll(entry.getValue());
+        } else {
+          messages.put(entry.getKey(), entry.getValue());
+        }
+      }
+    }
+  }
+
+  private <I extends WritableComparable, M extends Writable> boolean
+  equalMessages(
+      MessageStore<I, M> messageStore,
+      Map<I, Collection<M>> expectedMessages) throws IOException {
+    TreeSet<I> vertexIds = Sets.newTreeSet();
+    Iterables.addAll(vertexIds, messageStore.getDestinationVertices());
+    for (I vertexId : vertexIds) {
+      Iterable<M> expected = expectedMessages.get(vertexId);
+      if (expected == null) {
+        return false;
+      }
+      Iterable<M> actual = messageStore.getVertexMessages(vertexId);
+      if (!CollectionUtils.isEqual(expected, actual)) {
+        System.err.println("equalMessages: For vertexId " + vertexId +
+            " expected " + expected + ", but got " + actual);
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private <S extends MessageStore<IntWritable, IntWritable>> S doCheckpoint(
+      MessageStoreFactory<IntWritable, IntWritable, S> messageStoreFactory,
+      S messageStore) throws IOException {
+    File file = new File(directory + "messageStoreTest");
+    if (file.exists()) {
+      file.delete();
+    }
+    file.createNewFile();
+    DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
+        (new FileOutputStream(file))));
+    messageStore.write(out);
+    out.close();
+
+    messageStore = messageStoreFactory.newStore();
+
+    DataInputStream in = new DataInputStream(new BufferedInputStream(
+        (new FileInputStream(file))));
+    messageStore.readFields(in);
+    in.close();
+    file.delete();
+
+    return messageStore;
+  }
+
+  private <S extends MessageStore<IntWritable, IntWritable>> void
+  testMessageStore(
+      MessageStoreFactory<IntWritable, IntWritable, S> messageStoreFactory,
+      TestData testData) throws IOException {
+    SortedMap<IntWritable, Collection<IntWritable>> messages =
+        new TreeMap<IntWritable, Collection<IntWritable>>();
+    S messageStore = messageStoreFactory.newStore();
+    putNTimes(messageStore, messages, testData);
+    assertTrue(equalMessages(messageStore, messages));
+    messageStore.clearAll();
+    messageStore = doCheckpoint(messageStoreFactory, messageStore);
+    assertTrue(equalMessages(messageStore, messages));
+    messageStore.clearAll();
+  }
+
+  @Test
+  public void testByteArrayMessagesPerVertexStore() {
+    try {
+      testMessageStore(
+          ByteArrayMessagesPerVertexStore.newFactory(service, config),
+          testData);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testDiskBackedMessageStore() {
+    try {
+      MessageStoreFactory<IntWritable, IntWritable,
+          BasicMessageStore<IntWritable, IntWritable>> fileStoreFactory =
+          SequentialFileMessageStore.newFactory(config);
+      MessageStoreFactory<IntWritable, IntWritable,
+          FlushableMessageStore<IntWritable, IntWritable>> diskStoreFactory =
+          DiskBackedMessageStore.newFactory(config, fileStoreFactory);
+      testMessageStore(diskStoreFactory, testData);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testDiskBackedMessageStoreByPartition() {
+    try {
+      MessageStoreFactory<IntWritable, IntWritable,
+          BasicMessageStore<IntWritable, IntWritable>> fileStoreFactory =
+          SequentialFileMessageStore.newFactory(config);
+      MessageStoreFactory<IntWritable, IntWritable,
+          FlushableMessageStore<IntWritable, IntWritable>> diskStoreFactory =
+          DiskBackedMessageStore.newFactory(config, fileStoreFactory);
+      testMessageStore(DiskBackedMessageStoreByPartition.newFactory(service,
+          testData.maxMessagesInMemory, diskStoreFactory), testData);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTest.java b/giraph-core/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTest.java
new file mode 100644
index 0000000..fa5c693
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.io.IdWithValueTextOutputFormat;
+import org.apache.giraph.io.IntIntNullIntTextInputFormat;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.junit.Test;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.SetMultimap;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ *  Tests for {@link ConnectedComponentsVertex}
+ */
+public class ConnectedComponentsVertexTest {
+
+    /**
+     * A local integration test on toy data
+     */
+    @Test
+    public void testToyData() throws Exception {
+
+        // a small graph with three components
+        String[] graph = new String[] {
+                "1 2 3",
+                "2 1 4 5",
+                "3 1 4",
+                "4 2 3 5 13",
+                "5 2 4 12 13",
+                "12 5 13",
+                "13 4 5 12",
+
+                "6 7 8",
+                "7 6 10 11",
+                "8 6 10",
+                "10 7 8 11",
+                "11 7 10",
+
+                "9" };
+
+        GiraphClasses classes = new GiraphClasses();
+        classes.setVertexClass(ConnectedComponentsVertex.class);
+        classes.setCombinerClass(MinimumIntCombiner.class);
+        classes.setVertexInputFormatClass(IntIntNullIntTextInputFormat.class);
+        classes.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+        Map<String, String> emptyParams = ImmutableMap.of();
+
+        // run internally
+        Iterable<String> results = InternalVertexRunner.run(classes,
+            emptyParams, graph);
+
+        SetMultimap<Integer,Integer> components = parseResults(results);
+
+        Set<Integer> componentIDs = components.keySet();
+        assertEquals(3, componentIDs.size());
+        assertTrue(componentIDs.contains(1));
+        assertTrue(componentIDs.contains(6));
+        assertTrue(componentIDs.contains(9));
+
+        Set<Integer> componentOne = components.get(1);
+        assertEquals(7, componentOne.size());
+        assertTrue(componentOne.contains(1));
+        assertTrue(componentOne.contains(2));
+        assertTrue(componentOne.contains(3));
+        assertTrue(componentOne.contains(4));
+        assertTrue(componentOne.contains(5));
+        assertTrue(componentOne.contains(12));
+        assertTrue(componentOne.contains(13));
+
+        Set<Integer> componentTwo = components.get(6);
+        assertEquals(5, componentTwo.size());
+        assertTrue(componentTwo.contains(6));
+        assertTrue(componentTwo.contains(7));
+        assertTrue(componentTwo.contains(8));
+        assertTrue(componentTwo.contains(10));
+        assertTrue(componentTwo.contains(11));
+
+        Set<Integer> componentThree = components.get(9);
+        assertEquals(1, componentThree.size());
+        assertTrue(componentThree.contains(9));
+    }
+
+    private SetMultimap<Integer,Integer> parseResults(
+            Iterable<String> results) {
+        SetMultimap<Integer,Integer> components = HashMultimap.create();
+        for (String result : results) {
+            Iterable<String> parts = Splitter.on('\t').split(result);
+            int vertex = Integer.parseInt(Iterables.get(parts, 0));
+            int component = Integer.parseInt(Iterables.get(parts, 1));
+            components.put(component, vertex);
+        }
+        return components;
+    }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java b/giraph-core/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
new file mode 100644
index 0000000..99d0b38
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.giraph.graph.Combiner;
+import org.apache.hadoop.io.IntWritable;
+import org.junit.Test;
+
+public class MinimumIntCombinerTest {
+
+  @Test
+  public void testCombiner() throws Exception {
+    Combiner<IntWritable, IntWritable> combiner =
+        new MinimumIntCombiner();
+
+    IntWritable vertexId = new IntWritable(1);
+    IntWritable result = combiner.createInitialMessage();
+    combiner.combine(vertexId, result, new IntWritable(39947466));
+    combiner.combine(vertexId, result, new IntWritable(199));
+    combiner.combine(vertexId, result, new IntWritable(42));
+    combiner.combine(vertexId, result, new IntWritable(19998888));
+    assertEquals(42, result.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java b/giraph-core/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java
new file mode 100644
index 0000000..4052fe1
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.examples.RandomWalkVertex.RandomWalkVertexMasterCompute;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link RandomWalkWithRestartVertex}
+ */
+public class RandomWalkWithRestartVertexTest {
+
+  /** Minimum difference between doubles */
+  private static final double EPSILON = 10e-3;
+
+  /**
+   * A local integration test on toy data
+   */
+  @Test
+  public void testToyData() throws Exception {
+
+    // A small graph
+    String[] graph = new String[] { "12 34 56", "34 78", "56 34 78", "78 34" };
+
+    Map<String, String> params = Maps.newHashMap();
+    params.put(RandomWalkWithRestartVertex.SOURCE_VERTEX, "12");
+    params.put(RandomWalkWithRestartVertex.MAX_SUPERSTEPS, "30");
+    params.put(RandomWalkWithRestartVertex.TELEPORTATION_PROBABILITY, "0.25");
+
+    GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+        classes = new GiraphClasses();
+    classes.setVertexClass(RandomWalkWithRestartVertex.class);
+    classes.setVertexInputFormatClass(
+        LongDoubleFloatDoubleTextInputFormat.class);
+    classes.setVertexOutputFormatClass(
+        VertexWithDoubleValueFloatEdgeTextOutputFormat.class);
+    classes.setWorkerContextClass(RandomWalkWorkerContext.class);
+    classes.setMasterComputeClass(RandomWalkVertexMasterCompute.class);
+    // Run internally
+    Iterable<String> results = InternalVertexRunner.run(classes, params, graph);
+
+    Map<Long, Double> steadyStateProbabilities =
+        parseSteadyStateProbabilities(results);
+    // values computed with external software
+    // 0.25, 0.354872, 0.09375, 0.301377
+    assertEquals(0.25, steadyStateProbabilities.get(12L), EPSILON);
+    assertEquals(0.354872, steadyStateProbabilities.get(34L), EPSILON);
+    assertEquals(0.09375, steadyStateProbabilities.get(56L), EPSILON);
+    assertEquals(0.301377, steadyStateProbabilities.get(78L), EPSILON);
+  }
+
+  /**
+   * A local integration test on toy data
+   */
+  @Test
+  public void testWeightedGraph() throws Exception {
+    // A small graph
+    String[] graph =
+        new String[] { "12 34:0.1 56:0.9", "34 78:0.9 56:0.1",
+          "56 12:0.1 34:0.8 78:0.1", "78 34:1.0" };
+
+    Map<String, String> params = Maps.newHashMap();
+    params.put(RandomWalkWithRestartVertex.SOURCE_VERTEX, "12");
+    params.put(RandomWalkWithRestartVertex.MAX_SUPERSTEPS, "30");
+    params.put(RandomWalkWithRestartVertex.TELEPORTATION_PROBABILITY, "0.15");
+
+    GiraphClasses classes = new GiraphClasses();
+    classes.setVertexClass(RandomWalkWithRestartVertex.class);
+    classes.setVertexInputFormatClass(
+        NormalizingLongDoubleFloatDoubleTextInputFormat.class);
+    classes.setVertexOutputFormatClass(
+        VertexWithDoubleValueFloatEdgeTextOutputFormat.class);
+    classes.setWorkerContextClass(RandomWalkWorkerContext.class);
+    classes.setMasterComputeClass(RandomWalkVertexMasterCompute.class);
+    // Run internally
+    Iterable<String> results = InternalVertexRunner.run(classes, params, graph);
+
+    Map<Long, Double> steadyStateProbabilities =
+        parseSteadyStateProbabilities(results);
+    // values computed with external software
+    // 0.163365, 0.378932, 0.156886, 0.300816
+    assertEquals(0.163365, steadyStateProbabilities.get(12L), EPSILON);
+    assertEquals(0.378932, steadyStateProbabilities.get(34L), EPSILON);
+    assertEquals(0.156886, steadyStateProbabilities.get(56L), EPSILON);
+    assertEquals(0.300816, steadyStateProbabilities.get(78L), EPSILON);
+  }
+
+  /**
+   * Parse steady state probabilities.
+   * @param results The steady state probabilities in text format.
+   * @return A map representation of the steady state probabilities.
+   */
+  private Map<Long, Double> parseSteadyStateProbabilities(
+      Iterable<String> results) {
+    Map<Long, Double> result = Maps.newHashMap();
+    for (String s : results) {
+      String[] tokens = s.split("\\t");
+      Long id = Long.parseLong(tokens[0]);
+      Double value = Double.parseDouble(tokens[1]);
+      result.put(id, value);
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/org/apache/giraph/examples/SimpleShortestPathsVertexTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/examples/SimpleShortestPathsVertexTest.java b/giraph-core/src/test/java/org/apache/giraph/examples/SimpleShortestPathsVertexTest.java
new file mode 100644
index 0000000..a90dae7
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/examples/SimpleShortestPathsVertexTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.io.JsonLongDoubleFloatDoubleVertexInputFormat;
+import org.apache.giraph.io.JsonLongDoubleFloatDoubleVertexOutputFormat;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.giraph.utils.MockUtils;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+/**
+ * Contains a simple unit test for {@link SimpleShortestPathsVertex}
+ */
+public class SimpleShortestPathsVertexTest {
+
+  /**
+   * Test the behavior when a shorter path to a vertex has been found
+   */
+  @Test
+  public void testOnShorterPathFound() throws Exception {
+
+    SimpleShortestPathsVertex vertex = new SimpleShortestPathsVertex();
+    vertex.initialize(null, null);
+    vertex.addEdge(new Edge<LongWritable, FloatWritable>(
+        new LongWritable(10L), new FloatWritable(2.5f)));
+    vertex.addEdge(new Edge<LongWritable, FloatWritable>(
+        new LongWritable(20L), new FloatWritable(0.5f)));
+
+    MockUtils.MockedEnvironment<LongWritable, DoubleWritable, FloatWritable,
+    DoubleWritable> env = MockUtils.prepareVertex(vertex, 1L,
+        new LongWritable(7L), new DoubleWritable(Double.MAX_VALUE),
+        false);
+
+    Mockito.when(env.getConfiguration().getLong(
+        SimpleShortestPathsVertex.SOURCE_ID,
+        SimpleShortestPathsVertex.SOURCE_ID_DEFAULT)).thenReturn(2L);
+
+    vertex.compute(Lists.newArrayList(new DoubleWritable(2),
+        new DoubleWritable(1.5)));
+
+    assertTrue(vertex.isHalted());
+    assertEquals(1.5d, vertex.getValue().get(), 0d);
+
+    env.verifyMessageSent(new LongWritable(10L), new DoubleWritable(4));
+    env.verifyMessageSent(new LongWritable(20L), new DoubleWritable(2));
+  }
+
+  /**
+   * Test the behavior when a new, but not shorter path to a vertex has been
+   * found.
+   */
+  @Test
+  public void testOnNoShorterPathFound() throws Exception {
+
+    SimpleShortestPathsVertex vertex = new SimpleShortestPathsVertex();
+    vertex.initialize(new LongWritable(0), new DoubleWritable(0.0));
+    vertex.addEdge(new Edge<LongWritable, FloatWritable>(
+        new LongWritable(10L), new FloatWritable(2.5f)));
+    vertex.addEdge(new Edge<LongWritable, FloatWritable>(
+        new LongWritable(20L), new FloatWritable(0.5f)));
+
+    MockUtils.MockedEnvironment<LongWritable, DoubleWritable, FloatWritable,
+    DoubleWritable> env = MockUtils.prepareVertex(vertex, 1L,
+        new LongWritable(7L), new DoubleWritable(0.5), false);
+
+    Mockito.when(env.getConfiguration().getLong(
+        SimpleShortestPathsVertex.SOURCE_ID,
+        SimpleShortestPathsVertex.SOURCE_ID_DEFAULT)).thenReturn(2L);
+
+    vertex.compute(Lists.newArrayList(new DoubleWritable(2),
+        new DoubleWritable(1.5)));
+
+    assertTrue(vertex.isHalted());
+    assertEquals(0.5d, vertex.getValue().get(), 0d);
+
+    env.verifyNoMessageSent();
+  }
+
+  /**
+   * A local integration test on toy data
+   */
+  @Test
+  public void testToyData() throws Exception {
+
+    // a small four vertex graph
+    String[] graph = new String[] {
+        "[1,0,[[2,1],[3,3]]]",
+        "[2,0,[[3,1],[4,10]]]",
+        "[3,0,[[4,2]]]",
+        "[4,0,[]]"
+    };
+
+    // start from vertex 1
+    Map<String, String> params = Maps.newHashMap();
+    params.put(SimpleShortestPathsVertex.SOURCE_ID, "1");
+
+    GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+        classes = new GiraphClasses();
+    classes.setVertexClass(SimpleShortestPathsVertex.class);
+    classes.setVertexInputFormatClass(
+        JsonLongDoubleFloatDoubleVertexInputFormat.class);
+    classes.setVertexOutputFormatClass(
+        JsonLongDoubleFloatDoubleVertexOutputFormat.class);
+
+    // run internally
+    Iterable<String> results = InternalVertexRunner.run(classes, params, graph);
+
+    Map<Long, Double> distances = parseDistances(results);
+
+    // verify results
+    assertNotNull(distances);
+    assertEquals(4, (int) distances.size());
+    assertEquals(0.0, (double) distances.get(1L), 0d);
+    assertEquals(1.0, (double) distances.get(2L), 0d);
+    assertEquals(2.0, (double) distances.get(3L), 0d);
+    assertEquals(4.0, (double) distances.get(4L), 0d);
+  }
+
+  private Map<Long, Double> parseDistances(Iterable<String> results) {
+    Map<Long, Double> distances =
+        Maps.newHashMapWithExpectedSize(Iterables.size(results));
+    for (String line : results) {
+      try {
+        JSONArray jsonVertex = new JSONArray(line);
+        distances.put(jsonVertex.getLong(0), jsonVertex.getDouble(1));
+      } catch (JSONException e) {
+        throw new IllegalArgumentException(
+            "Couldn't get vertex from line " + line, e);
+      }
+    }
+    return distances;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingVertexTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingVertexTest.java b/giraph-core/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingVertexTest.java
new file mode 100644
index 0000000..6af7339
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingVertexTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.utils.MockUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Contains a simple unit test for {@link SimpleTriangleClosingVertex}
+ */
+public class SimpleTriangleClosingVertexTest {
+
+  /**
+   * Test the behavior of the triangle closing algorithm:
+   * does it send all its out edge values to all neighbors?
+   */
+  @Test
+  public void testSuperstepZero() throws Exception {
+    // this guy should end up with an array value of 4
+    SimpleTriangleClosingVertex vertex =
+      new SimpleTriangleClosingVertex();
+    SimpleTriangleClosingVertex.IntArrayListWritable alw =
+      new SimpleTriangleClosingVertex.IntArrayListWritable();
+    vertex.initialize(null, null);
+    vertex.addEdge(new Edge<IntWritable, NullWritable>(new IntWritable(5),
+        NullWritable.get()));
+    vertex.addEdge(new Edge<IntWritable, NullWritable>(new IntWritable(7),
+        NullWritable.get()));
+
+    MockUtils.MockedEnvironment<IntWritable,
+      SimpleTriangleClosingVertex.IntArrayListWritable,
+    NullWritable, IntWritable> env =
+      MockUtils.prepareVertex(vertex, 0L,
+        new IntWritable(1), alw, false);
+
+    vertex.compute(Lists.<IntWritable>newArrayList(
+      new IntWritable(83), new IntWritable(42)));
+
+    env.verifyMessageSent(new IntWritable(5), new IntWritable(5));
+    env.verifyMessageSent(new IntWritable(5), new IntWritable(7));
+    env.verifyMessageSent(new IntWritable(7), new IntWritable(5));
+    env.verifyMessageSent(new IntWritable(7), new IntWritable(7));
+  }
+
+  /** Test behavior of compute() with incoming messages (superstep 1) */
+  @Test
+  public void testSuperstepOne() throws Exception {
+    // see if the vertex interprets its incoming
+    // messages properly to verify the algorithm
+    SimpleTriangleClosingVertex vertex =
+      new SimpleTriangleClosingVertex();
+    vertex.initialize(null, null);
+    MockUtils.MockedEnvironment<IntWritable,
+      SimpleTriangleClosingVertex.IntArrayListWritable,
+      NullWritable, IntWritable>
+      env = MockUtils.<IntWritable,
+      SimpleTriangleClosingVertex.IntArrayListWritable,
+      NullWritable, IntWritable> prepareVertex(
+        vertex, 1L, new IntWritable(1), null, false);
+      // superstep 1: can the vertex process these correctly?
+      vertex.compute(Lists.<IntWritable>newArrayList(
+        new IntWritable(7),
+        new IntWritable(3),
+        new IntWritable(4),
+        new IntWritable(7),
+        new IntWritable(4),
+        new IntWritable(2),
+        new IntWritable(4)));
+      final String pairCheck = "[4, 7]";
+      assertEquals(pairCheck, vertex.getValue().toString());
+  }
+ }

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java b/giraph-core/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java
new file mode 100644
index 0000000..373483a
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.io.IdWithValueTextOutputFormat;
+import org.apache.giraph.io.IntIntNullIntTextInputFormat;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.junit.Test;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ *  Tests for {@link TryMultiIpcBindingPortsTest}
+ */
+public class TryMultiIpcBindingPortsTest {
+
+    /**
+     * A local integration test on toy data
+     */
+    @Test
+    public void testToyData() throws Exception {
+
+        // a small graph with three components
+        String[] graph = new String[] {
+                "1 2 3",
+                "2 1 4 5",
+                "3 1 4",
+                "4 2 3 5 13",
+                "5 2 4 12 13",
+                "12 5 13",
+                "13 4 5 12",
+
+                "6 7 8",
+                "7 6 10 11",
+                "8 6 10",
+                "10 7 8 11",
+                "11 7 10",
+
+                "9" };
+
+        // run internally
+        // fail the first port binding attempt
+        Map<String, String> params = Maps.<String, String>newHashMap();
+        params.put(GiraphConstants.FAIL_FIRST_IPC_PORT_BIND_ATTEMPT, "true");
+
+        GiraphClasses classes = new GiraphClasses();
+        classes.setVertexClass(ConnectedComponentsVertex.class);
+        classes.setCombinerClass(MinimumIntCombiner.class);
+        classes.setVertexInputFormatClass(IntIntNullIntTextInputFormat.class);
+        classes.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+
+        Iterable<String> results = InternalVertexRunner.run(classes, params,
+                graph);
+
+        SetMultimap<Integer,Integer> components = parseResults(results);
+
+        Set<Integer> componentIDs = components.keySet();
+        assertEquals(3, componentIDs.size());
+        assertTrue(componentIDs.contains(1));
+        assertTrue(componentIDs.contains(6));
+        assertTrue(componentIDs.contains(9));
+
+        Set<Integer> componentOne = components.get(1);
+        assertEquals(7, componentOne.size());
+        assertTrue(componentOne.contains(1));
+        assertTrue(componentOne.contains(2));
+        assertTrue(componentOne.contains(3));
+        assertTrue(componentOne.contains(4));
+        assertTrue(componentOne.contains(5));
+        assertTrue(componentOne.contains(12));
+        assertTrue(componentOne.contains(13));
+
+        Set<Integer> componentTwo = components.get(6);
+        assertEquals(5, componentTwo.size());
+        assertTrue(componentTwo.contains(6));
+        assertTrue(componentTwo.contains(7));
+        assertTrue(componentTwo.contains(8));
+        assertTrue(componentTwo.contains(10));
+        assertTrue(componentTwo.contains(11));
+
+        Set<Integer> componentThree = components.get(9);
+        assertEquals(1, componentThree.size());
+        assertTrue(componentThree.contains(9));
+    }
+
+    private SetMultimap<Integer,Integer> parseResults(
+            Iterable<String> results) {
+        SetMultimap<Integer,Integer> components = HashMultimap.create();
+        for (String result : results) {
+            Iterable<String> parts = Splitter.on('\t').split(result);
+            int vertex = Integer.parseInt(Iterables.get(parts, 0));
+            int component = Integer.parseInt(Iterables.get(parts, 1));
+            components.put(component, vertex);
+        }
+        return components;
+    }
+}


Mime
View raw message