giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ikabi...@apache.org
Subject [1/2] git commit: updated refs/heads/trunk to 3b7c68e
Date Thu, 02 Jul 2015 23:33:13 GMT
Repository: giraph
Updated Branches:
  refs/heads/trunk d7e4bde11 -> 3b7c68e54


http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/TestMessageChain.java
----------------------------------------------------------------------
diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/TestMessageChain.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/TestMessageChain.java
new file mode 100644
index 0000000..0f5f1ac
--- /dev/null
+++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/TestMessageChain.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library;
+
+import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.block.SequenceBlock;
+import org.apache.giraph.block_app.framework.piece.Piece;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
+import org.apache.giraph.block_app.test_setup.NumericTestGraph;
+import org.apache.giraph.combiner.MaxMessageCombiner;
+import org.apache.giraph.combiner.SumMessageCombiner;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.function.primitive.PrimitiveRefs.LongRef;
+import org.apache.giraph.reducers.impl.SumReduce;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+/**
+ * Tests and examples of using SendMessageChain
+ */
+public class TestMessageChain {
+
+  private static GiraphConfiguration createConf() {
+    GiraphConfiguration conf = new GiraphConfiguration();
+    GiraphConstants.VERTEX_ID_CLASS.set(conf, LongWritable.class);
+    GiraphConstants.VERTEX_VALUE_CLASS.set(conf, LongWritable.class);
+    GiraphConstants.EDGE_VALUE_CLASS.set(conf, NullWritable.class);
+    return conf;
+  }
+
+  private static NumericTestGraph<LongWritable, LongWritable, NullWritable> createTestGraph() {
+    NumericTestGraph<LongWritable, LongWritable, NullWritable> graph =
+        new NumericTestGraph<LongWritable, LongWritable, NullWritable>(createConf());
+    graph.addVertex(1);
+    graph.addVertex(2);
+    graph.addVertex(3);
+    graph.addVertex(4);
+
+    graph.addSymmetricEdge(1, 2);
+    graph.addSymmetricEdge(2, 3);
+    return graph;
+  }
+
+  @Test
+  public void testReply() {
+    NumericTestGraph<LongWritable, LongWritable, NullWritable> graph = createTestGraph();
+
+    // calculates max ID of FOFs
+    Block reply = SendMessageChain.<LongWritable, LongWritable, NullWritable, LongWritable>
+    startSendToNeighbors(
+        "SendMyIdToAllNeighbors",
+        LongWritable.class,
+        VertexSuppliers.vertexIdSupplier()
+    ).thenSendToNeighbors(
+        "SendMaxIReceivedToAllNeighbors",
+        LongWritable.class,
+        (vertex, messages) -> new LongWritable(max(messages))
+    ).endConsume(
+        (vertex, messages) -> vertex.getValue().set(max(messages))
+    );
+
+    LocalBlockRunner.runBlock(graph.getTestGraph(), reply, new Object());
+
+    Assert.assertEquals(3, graph.getVertex(1).getValue().get());
+    Assert.assertEquals(2, graph.getVertex(2).getValue().get());
+    Assert.assertEquals(3, graph.getVertex(3).getValue().get());
+    Assert.assertEquals(0, graph.getVertex(4).getValue().get());
+  }
+
+  @Test
+  public void testReplyCombiner() {
+    NumericTestGraph<LongWritable, LongWritable, NullWritable> graph = createTestGraph();
+
+    // calculates max ID of FOFs
+    Block reply = SendMessageChain.<LongWritable, LongWritable, NullWritable, LongWritable>
+    startSendToNeighbors(
+        "SendMyIdToAllNeighbors",
+        MaxMessageCombiner.LONG,
+        VertexSuppliers.vertexIdSupplier()
+    ).thenSendToNeighbors(
+        "SendMaxIReceivedToAllNeighbors",
+        MaxMessageCombiner.LONG,
+        (vertex, message) -> message
+    ).endConsume(
+        (vertex, message) -> vertex.getValue().set(message != null ? message.get() : 0)
+    );
+
+    LocalBlockRunner.runBlock(graph.getTestGraph(), reply, new Object());
+
+    Assert.assertEquals(3, graph.getVertex(1).getValue().get());
+    Assert.assertEquals(2, graph.getVertex(2).getValue().get());
+    Assert.assertEquals(3, graph.getVertex(3).getValue().get());
+    Assert.assertEquals(0, graph.getVertex(4).getValue().get());
+  }
+
+  @Test
+  public void testReplyCombinerEndReduce() {
+    NumericTestGraph<LongWritable, LongWritable, NullWritable> graph = createTestGraph();
+
+    LongRef sumOfAll = new LongRef(0);
+
+    // calculates max ID of FOFs
+    Block reply = SendMessageChain.<LongWritable, LongWritable, NullWritable, LongWritable>
+    startSendToNeighbors(
+        "SendMyIdToAllNeighbors",
+        MaxMessageCombiner.LONG,
+        VertexSuppliers.vertexIdSupplier()
+    ).thenSendToNeighbors(
+        "SendMaxIReceivedToAllNeighbors",
+        MaxMessageCombiner.LONG,
+        (vertex, message) -> message
+    ).endReduce(
+        "SumAllReceivedValues",
+        SumReduce.LONG,
+        (vertex, message) -> message != null ? message : new LongWritable(0),
+        (value) -> sumOfAll.value = value.get()
+    );
+
+    LocalBlockRunner.runBlock(
+        graph.getTestGraph(),
+        new SequenceBlock(
+            reply,
+            Pieces.forAllVertices(
+                "SetAllValuesToReduced",
+                (vertex) -> ((LongWritable) vertex.getValue()).set(sumOfAll.value))),
+        new Object());
+
+    Assert.assertEquals(8, graph.getVertex(1).getValue().get());
+    Assert.assertEquals(8, graph.getVertex(2).getValue().get());
+    Assert.assertEquals(8, graph.getVertex(3).getValue().get());
+    Assert.assertEquals(8, graph.getVertex(4).getValue().get());
+
+    // Block execution is happening in the separate environment if SERIALIZE_MASTER is used,
+    // so our instance of sumOfAll will be unchanged
+    Assert.assertEquals(LocalBlockRunner.SERIALIZE_MASTER.getDefaultValue() ? 0 : 8, sumOfAll.value);
+  }
+
+
+  @Test
+  public void testStartCustom() {
+    NumericTestGraph<LongWritable, LongWritable, NullWritable> graph = createTestGraph();
+
+    Block reply = SendMessageChain.<LongWritable, LongWritable, NullWritable, LongWritable>
+    startCustom(
+        // Sends ID to it's first neighbor, passing max of received messages to next part of the chain
+        (consumer) -> new Piece<LongWritable, LongWritable, NullWritable, LongWritable, Object>() {
+          @Override
+          public VertexSender<LongWritable, LongWritable, NullWritable> getVertexSender(
+              BlockWorkerSendApi<LongWritable, LongWritable, NullWritable, LongWritable> workerApi,
+              Object executionStage) {
+            return (vertex) -> {
+              Edge<LongWritable, NullWritable> edge =
+                  Iterators.getNext(vertex.getEdges().iterator(), null);
+              if (edge != null) {
+                workerApi.sendMessage(edge.getTargetVertexId(), vertex.getId());
+              }
+            };
+          }
+
+          @Override
+          public VertexReceiver<LongWritable, LongWritable, NullWritable, LongWritable>
+              getVertexReceiver(BlockWorkerReceiveApi<LongWritable> workerApi, Object executionStage) {
+            return (vertex, messages) -> {
+              consumer.apply(vertex, new LongWritable(max(messages)));
+            };
+          }
+
+          @Override
+          protected Class<LongWritable> getMessageClass() {
+            return LongWritable.class;
+          }
+        }
+    ).thenSendToNeighbors(
+        "SendMaxIReceivedToAllNeighbors",
+        SumMessageCombiner.LONG,
+        (vertex, message) -> message
+    ).endConsume(
+        (vertex, message) -> vertex.getValue().set(message != null ? message.get() : 0)
+    );
+
+    LocalBlockRunner.runBlock(graph.getTestGraph(), reply, new Object());
+
+    Assert.assertEquals(3, graph.getVertex(1).getValue().get());
+    Assert.assertEquals(2, graph.getVertex(2).getValue().get());
+    Assert.assertEquals(3, graph.getVertex(3).getValue().get());
+    Assert.assertEquals(0, graph.getVertex(4).getValue().get());
+  }
+
+
+
+
+  private static long max(Iterable<LongWritable> messages) {
+    long result = 0;
+    for (LongWritable message : messages) {
+      result = Math.max(result, message.get());
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/BreadthFirstSearchBlockFactory.java
----------------------------------------------------------------------
diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/BreadthFirstSearchBlockFactory.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/BreadthFirstSearchBlockFactory.java
new file mode 100644
index 0000000..e16313a
--- /dev/null
+++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/BreadthFirstSearchBlockFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.algo;
+
+import org.apache.giraph.block_app.framework.AbstractBlockFactory;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.function.vertex.ConsumerWithVertex;
+import org.apache.giraph.function.vertex.SupplierFromVertex;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Example Application of BFS calculation
+ */
+public class BreadthFirstSearchBlockFactory extends AbstractBlockFactory<Object> {
+  @Override
+  public Block createBlock(GiraphConfiguration conf) {
+    SupplierFromVertex<LongWritable, BreadthFirstSearchVertexValue, Writable, Boolean>
+    isVertexInSeedSet =
+      (vertex) -> {
+        return vertex.getValue().isSeedVertex();
+      };
+
+    SupplierFromVertex<LongWritable, BreadthFirstSearchVertexValue, Writable, IntWritable>
+    getDistance =
+      (vertex) -> {
+        return new IntWritable(vertex.getValue().getDistance());
+      };
+
+    ConsumerWithVertex<LongWritable, BreadthFirstSearchVertexValue, Writable, IntWritable>
+    setDistance =
+      (vertex, value) -> {
+        vertex.getValue().setDistance(value.get());
+      };
+
+    return BreadthFirstSearch.bfs(
+      isVertexInSeedSet,
+      getDistance,
+      setDistance);
+  }
+
+  @Override
+  public Object createExecutionStage(GiraphConfiguration conf) {
+    return new Object();
+  }
+
+  @Override
+  protected Class<LongWritable> getVertexIDClass(GiraphConfiguration conf) {
+    return LongWritable.class;
+  }
+
+  @Override
+  protected Class<BreadthFirstSearchVertexValue> getVertexValueClass(GiraphConfiguration conf) {
+    return BreadthFirstSearchVertexValue.class;
+  }
+
+  @Override
+  protected Class<NullWritable> getEdgeValueClass(GiraphConfiguration conf) {
+    return NullWritable.class;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/BreadthFirstSearchVertexValue.java
----------------------------------------------------------------------
diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/BreadthFirstSearchVertexValue.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/BreadthFirstSearchVertexValue.java
new file mode 100644
index 0000000..88b78e5
--- /dev/null
+++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/BreadthFirstSearchVertexValue.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.algo;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Vertex-value class for BreadthFirstSearchBlockFactory
+ */
+public class BreadthFirstSearchVertexValue implements Writable {
+  private boolean seedVertex;
+  private int distance;
+
+  public boolean isSeedVertex() {
+    return seedVertex;
+  }
+
+  public void setSeedVertex(boolean seedVertex) {
+    this.seedVertex = seedVertex;
+  }
+
+  public int getDistance() {
+    return distance;
+  }
+
+  public void setDistance(int distance) {
+    this.distance = distance;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeBoolean(seedVertex);
+    out.writeInt(distance);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    seedVertex = in.readBoolean();
+    distance = in.readInt();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/DistributedIndependentSetBlockFactory.java
----------------------------------------------------------------------
diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/DistributedIndependentSetBlockFactory.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/DistributedIndependentSetBlockFactory.java
new file mode 100644
index 0000000..76c9f89
--- /dev/null
+++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/DistributedIndependentSetBlockFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.algo;
+
+import org.apache.giraph.block_app.framework.AbstractBlockFactory;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+
+/**
+ * Example Application of distributed independent set calculation
+ */
+public class DistributedIndependentSetBlockFactory extends AbstractBlockFactory<Object> {
+  @Override
+  public Block createBlock(GiraphConfiguration conf) {
+    return DistributedIndependentSet.
+        <LongWritable, DistributedIndependentSetVertexValue>independentSets(
+            getVertexIDClass(conf),
+            (vertex) -> vertex.getValue().getIndependentSetID(),
+            (vertex, value) -> vertex.getValue().setIndependentSetID(value)
+        );
+  }
+
+  @Override
+  public Object createExecutionStage(GiraphConfiguration conf) {
+    return new Object();
+  }
+
+  @Override
+  protected Class<LongWritable> getVertexIDClass(GiraphConfiguration conf) {
+    return LongWritable.class;
+  }
+
+  @Override
+  protected Class<DistributedIndependentSetVertexValue> getVertexValueClass(
+      GiraphConfiguration conf) {
+    return DistributedIndependentSetVertexValue.class;
+  }
+
+  @Override
+  protected Class<NullWritable> getEdgeValueClass(GiraphConfiguration conf) {
+    return NullWritable.class;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/DistributedIndependentSetVertexValue.java
----------------------------------------------------------------------
diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/DistributedIndependentSetVertexValue.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/DistributedIndependentSetVertexValue.java
new file mode 100644
index 0000000..d29ee00
--- /dev/null
+++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/DistributedIndependentSetVertexValue.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.algo;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Vertex-value class for DistributedIndependentSetBlockFactory
+ */
+public class DistributedIndependentSetVertexValue implements Writable {
+  private IntWritable independentSetID;
+
+  public IntWritable getIndependentSetID() {
+    return independentSetID;
+  }
+
+  public void setIndependentSetID(IntWritable independentSetID) {
+    this.independentSetID = independentSetID;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    if (independentSetID == null) {
+      independentSetID = new IntWritable();
+    }
+    out.writeInt(independentSetID.get());
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    if (independentSetID == null) {
+      independentSetID = new IntWritable();
+    }
+    independentSetID.set(in.readInt());
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/TestBreadthFirstSearch.java
----------------------------------------------------------------------
diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/TestBreadthFirstSearch.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/TestBreadthFirstSearch.java
new file mode 100644
index 0000000..c19fe27
--- /dev/null
+++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/TestBreadthFirstSearch.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.algo;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.giraph.block_app.framework.BlockUtils;
+import org.apache.giraph.block_app.test_setup.NumericTestGraph;
+import org.apache.giraph.block_app.test_setup.TestGraphModifier;
+import org.apache.giraph.block_app.test_setup.TestGraphUtils;
+import org.apache.giraph.block_app.test_setup.graphs.Small1GraphInit;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.junit.Test;
+
+public class TestBreadthFirstSearch {
+  private void run(
+    TestGraphModifier<LongWritable, BreadthFirstSearchVertexValue, NullWritable> graphLoader,
+    int[] expectedDistances,
+    int[] seedVertices
+  ) throws Exception {
+    TestGraphModifier<LongWritable, BreadthFirstSearchVertexValue, NullWritable> valueLoader =
+      (graph) -> {
+        List<Integer> seeds = Arrays.asList(ArrayUtils.toObject(seedVertices));
+        for (int i = 0; i < graph.getVertexCount(); i++)
+          graph.getVertex(i).getValue().setSeedVertex(seeds.contains(i));
+      };
+
+    TestGraphUtils.runTest(
+      TestGraphUtils.chainModifiers(graphLoader, valueLoader),
+      (graph) -> {
+        for (int i = 0; i < expectedDistances.length; i++) {
+          assertEquals(expectedDistances[i], graph.getValue(i).getDistance());
+        }
+      },
+      (conf) -> {
+        BlockUtils.setBlockFactoryClass(conf, BreadthFirstSearchBlockFactory.class);
+      }
+    );
+  }
+
+  @Test
+  public void testSmall1SingleSeed() throws Exception {
+    int[] expected = {0, 1, 1, 2, 3, 3, -1};
+    int[] seeds = {0};
+    run(new Small1GraphInit<>(), expected, seeds);
+  }
+
+  @Test
+  public void testSmall1TwoSeeds() throws Exception {
+    int[] expected = {0, 1, 1, 1, 0, 1, -1};
+    int[] seeds = {0, 4};
+    run(new Small1GraphInit<>(), expected, seeds);
+  }
+
+  @Test
+  public void testSmall1IsolatedSeed() throws Exception {
+    int[] expected = {-1, -1, -1, -1, -1, -1, 0};
+    int[] seeds = {6};
+    run(new Small1GraphInit<>(), expected, seeds);
+  }
+
+  @Test
+  public void testSmallGraphTwoSeeds() throws Exception {
+    int[] expected = {0, 1, 2, 2, 2, 2, 3, 4, 5, 5, 5, 1, 2, 2, 2, 0};
+    int[] seeds = {0, 15};
+    run(new Graph1Init<>(), expected, seeds);
+  }
+
+  @Test
+  public void testSmallGraphTwoCloseSeeds() throws Exception {
+    int[] expected = {1, 0, 1, 0, 1, 1, 1, 2, 3, 3, 3, 2, 3, 3, 3, 3};
+    int[] seeds = {1, 3};
+    run(new Graph1Init<>(), expected, seeds);
+  }
+
+  @Test
+  public void testMultipleComponentGraphCloseSeeds() throws Exception {
+    int[] expected = {2, 1, 0, 1, 2, 3, 3, 3, 2, 2, 2, 2, 1, 0, 2, -1, -1, -1, -1, -1, -1};
+    int[] seeds = {13, 2};
+    run(new Graph2Init(), expected, seeds);
+  }
+
+  @Test
+  public void testMultipleComponentGraphFarSeeds() throws Exception {
+    int[] expected = {3, 2, 3, 2, 1, 0, 1, 2, 1, 2, 3, 3, 2, 3, 3, 3, 2, 1, 0, 1, -1};
+    int[] seeds = {5, 18};
+    run(new Graph2Init(), expected, seeds);
+  }
+
+
+  public class Graph1Init<I extends WritableComparable, V extends Writable, E extends Writable>
+      implements TestGraphModifier<I, V, E> {
+
+    @Override
+    public void modifyGraph(NumericTestGraph<I, V, E> graph) {
+      graph.addVertex(0, (Number) null, null, 1);
+      graph.addVertex(1, (Number) null, null, 0,2,3,4,5);
+      graph.addVertex(2, (Number) null, null, 1,3,4,5);
+      graph.addVertex(3, (Number) null, null, 1,2,4,5,6);
+      graph.addVertex(4, (Number) null, null, 1,2,3,5);
+      graph.addVertex(5, (Number) null, null, 1,2,3,4,11);
+      graph.addVertex(6, (Number) null, null, 3,7);
+      graph.addVertex(7, (Number) null, null, 6,8,9,10);
+      graph.addVertex(8, (Number) null, null, 7,9,10);
+      graph.addVertex(9, (Number) null, null, 7,8,10);
+      graph.addVertex(10, (Number) null, null, 7,8,9);
+      graph.addVertex(11, (Number) null, null, 5,12,13,14,15);
+      graph.addVertex(12, (Number) null, null, 11);
+      graph.addVertex(13, (Number) null, null, 11);
+      graph.addVertex(14, (Number) null, null, 11);
+      graph.addVertex(15, (Number) null, null, 11);
+    }
+  }
+
+  public class Graph2Init<I extends WritableComparable, V extends Writable, E extends Writable>
+      implements TestGraphModifier<I, V, E> {
+
+    @Override
+    public void modifyGraph(NumericTestGraph<I, V, E> graph) {
+      graph.addVertex(0, (Number) null, null, 1);
+      graph.addVertex(1, (Number) null, null, 0,2,3,4);
+      graph.addVertex(2, (Number) null, null, 1,3);
+      graph.addVertex(3, (Number) null, null, 2,4,9,10,11);
+      graph.addVertex(4, (Number) null, null, 1,3,5,6,7);
+      graph.addVertex(5, (Number) null, null, 4,6,5,8);
+      graph.addVertex(6, (Number) null, null, 4,5,7);
+      graph.addVertex(7, (Number) null, null, 4,5,6);
+      graph.addVertex(8, (Number) null, null, 5,9,12);
+      graph.addVertex(9, (Number) null, null, 3,8,10,11,12);
+      graph.addVertex(10, (Number) null, null, 3,9,11);
+      graph.addVertex(11, (Number) null, null, 3,9,10);
+      graph.addVertex(12, (Number) null, null, 8,9,13,14);
+      graph.addVertex(13, (Number) null, null, 12);
+      graph.addVertex(14, (Number) null, null, 12);
+      graph.addVertex(15, (Number) null, null, 16);
+      graph.addVertex(16, (Number) null, null, 15,17,19);
+      graph.addVertex(17, (Number) null, null, 16,18);
+      graph.addVertex(18, (Number) null, null, 17,19);
+      graph.addVertex(19, (Number) null, null, 16,18);
+      graph.addVertex(20);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/TestDistributedIndependentSet.java
----------------------------------------------------------------------
diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/TestDistributedIndependentSet.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/TestDistributedIndependentSet.java
new file mode 100644
index 0000000..fbd380b
--- /dev/null
+++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/TestDistributedIndependentSet.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.algo;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.giraph.block_app.framework.BlockUtils;
+import org.apache.giraph.block_app.test_setup.NumericTestGraph;
+import org.apache.giraph.block_app.test_setup.TestGraphModifier;
+import org.apache.giraph.block_app.test_setup.TestGraphUtils;
+import org.apache.giraph.edge.Edge;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestDistributedIndependentSet {
+
+  private void runTest(
+      TestGraphModifier<LongWritable, DistributedIndependentSetVertexValue, NullWritable>
+          graphLoader) throws Exception {
+    TestGraphUtils.runTest(graphLoader, (graph) -> {
+      checkDecomposition(graph);
+    }, (conf) -> {
+      BlockUtils.setBlockFactoryClass(conf, DistributedIndependentSetBlockFactory.class);
+    });
+  }
+
+  private static void checkDecomposition(
+      NumericTestGraph<LongWritable, DistributedIndependentSetVertexValue, NullWritable> graph) {
+    final int UNASSIGNED = -1;
+    final int HAS_EDGE_TO_IND_SET = -2;
+    // Hold (id -> List of vertices) for each independent set id
+    HashMap<Integer, ArrayList<Integer>> indSets = new HashMap<>();
+    int numVertices = graph.getVertexCount();
+
+    int numIndSets = 0;
+    for (int i = 0; i < numVertices; ++i) {
+      int indSetID = graph.getVertex(i).getValue().getIndependentSetID().get();
+      // Number of independent sets are less than or equal to the number of vertices in the input
+      // graph.
+      Assert.assertTrue(indSetID >= 0 && indSetID < numVertices);
+      // All tests assign (0, 1, ...) as independent set ids. The vertex assigned to the max id
+      // is a specifier for the total number of independent sets the input graph is decomposed
+      // into.
+      if (indSetID > numIndSets)
+        numIndSets = indSetID;
+      ArrayList<Integer> mapValue = indSets.get(indSetID);
+      if (mapValue == null) {
+        mapValue = new ArrayList<>();
+        indSets.put(indSetID, mapValue);
+      }
+      mapValue.add(i);
+    }
+    numIndSets++;
+
+    int[] label = new int[numVertices];
+    for (int i = 0; i < numVertices; ++i)
+      label[i] = UNASSIGNED;
+
+    for (int i = 0; i < numIndSets; ++i) {
+      ArrayList<Integer> indSet = indSets.get(i);
+
+      // All independent set ids are assigned consecutively starting from 0. There should be at
+      // least one vertex per assigned independent set.
+      Assert.assertTrue(indSet != null && indSet.size() > 0);
+      for (int v : indSet) {
+        // Check if vertices in this independent set is not assigned to any of the previous
+        // independent sets.
+        Assert.assertTrue(label[v] == UNASSIGNED);
+        label[v] = i;
+      }
+
+      for (int v : indSet) {
+        for (Edge<LongWritable, NullWritable> edge : graph.getVertex(v).getEdges()) {
+          int u = (int) edge.getTargetVertexId().get();
+          // Check all vertices in the current independent set do not have edge to each other.
+          Assert.assertTrue(label[u] != label[v]);
+          // Mark unassigned vertices neighboring this independent set. This is necessary to check
+          // if this independent set is 'maximal'.
+          if (label[u] == UNASSIGNED)
+            label[u] = HAS_EDGE_TO_IND_SET;
+        }
+      }
+
+      // Check if the independent set is maximal.
+      for (int j = 0; j < numVertices; ++j) {
+        Assert.assertTrue(label[j] != UNASSIGNED);
+        // Reset marked vertices neighboring to this independent set.
+        if (label[j] == HAS_EDGE_TO_IND_SET)
+          label[j] = UNASSIGNED;
+      }
+    }
+  }
+
+  private static void createVertices(
+      NumericTestGraph<LongWritable, DistributedIndependentSetVertexValue, NullWritable> graph,
+      int numVertices) {
+    for (int i = 0; i < numVertices; ++i)
+      graph.addVertex(i);
+  }
+
+  @Test
+  public void testSmallGraph() throws Exception {
+    /*
+     *   1      5
+     *  / \    / \    6
+     * 0---2--3---4
+     */
+    final int NUM_VERTICES = 7;
+    runTest((graph) -> {
+      createVertices(graph, NUM_VERTICES);
+      graph.addSymmetricEdge(0, 1);
+      graph.addSymmetricEdge(0, 2);
+      graph.addSymmetricEdge(1, 2);
+      graph.addSymmetricEdge(2, 3);
+      graph.addSymmetricEdge(3, 4);
+      graph.addSymmetricEdge(3, 5);
+      graph.addSymmetricEdge(4, 5);
+    });
+  }
+
+  @Test
+  public void testSmallGraphOrderingEffect() throws Exception {
+    /*
+     *   4      5
+     *  / \    / \    6
+     * 0---2--1---3
+     */
+    final int NUM_VERTICES = 7;
+    runTest((graph) -> {
+      createVertices(graph, NUM_VERTICES);
+      graph.addSymmetricEdge(0, 4);
+      graph.addSymmetricEdge(0, 2);
+      graph.addSymmetricEdge(2, 4);
+      graph.addSymmetricEdge(1, 2);
+      graph.addSymmetricEdge(1, 5);
+      graph.addSymmetricEdge(1, 3);
+      graph.addSymmetricEdge(3, 5);
+    });
+  }
+
+  @Test
+  public void testRingOdd() throws Exception {
+    final int NUM_VERTICES = 13;
+    runTest((graph) -> {
+      createVertices(graph, NUM_VERTICES);
+      for (int i = 1; i < NUM_VERTICES; ++i)
+        graph.addSymmetricEdge(i - 1, i);
+      graph.addSymmetricEdge(0, NUM_VERTICES - 1);
+    });
+  }
+
+  @Test
+  public void testStarGraph() throws Exception {
+    final int NUM_VERTICES = 15;
+    runTest((graph) -> {
+      createVertices(graph, NUM_VERTICES);
+      for (int i = 1; i < NUM_VERTICES; ++i)
+        graph.addSymmetricEdge(0, i);
+    });
+  }
+
+  @Test
+  public void testMultipleStarGraphs() throws Exception {
+    final int NUM_VERTICES1 = 15;
+    final int NUM_VERTICES2 = 21;
+    runTest((graph) -> {
+      createVertices(graph, NUM_VERTICES1 + NUM_VERTICES2);
+      for (int i = 1; i < NUM_VERTICES1; ++i)
+        graph.addSymmetricEdge(0, i);
+
+      for (int i = 1 + NUM_VERTICES1; i < NUM_VERTICES1 + NUM_VERTICES2; ++i)
+        graph.addSymmetricEdge(NUM_VERTICES1, i);
+    });
+  }
+
+  @Test
+  public void testMeshGraph() throws Exception {
+    final int M = 11;
+    final int N = 13;
+    runTest((graph) -> {
+      createVertices(graph, M * N);
+      for (int i = 0; i < M; ++i)
+        for (int j = 0; j < N; ++j) {
+          if (i != M - 1)
+            graph.addSymmetricEdge(i * N + j, (i + 1) * N + j);
+          if (j != N - 1)
+            graph.addSymmetricEdge(i * N + j, i * N + (j + 1));
+        }
+    });
+  }
+
+  @Test
+  public void testCompleteGraph() throws Exception {
+    final int NUM_VERTICES = 17;
+    runTest((graph) -> {
+      createVertices(graph, NUM_VERTICES);
+      for (int i = 0; i < NUM_VERTICES; ++i)
+        for (int j = i + 1; j < NUM_VERTICES; ++j)
+          graph.addSymmetricEdge(i, j);
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/coarsening/TestCoarseningUtils.java
----------------------------------------------------------------------
diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/coarsening/TestCoarseningUtils.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/coarsening/TestCoarseningUtils.java
new file mode 100644
index 0000000..1f5ece6
--- /dev/null
+++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/coarsening/TestCoarseningUtils.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.coarsening;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.giraph.block_app.framework.AbstractBlockFactory;
+import org.apache.giraph.block_app.framework.BlockUtils;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.library.ReusableSuppliers;
+import org.apache.giraph.block_app.test_setup.TestGraphUtils;
+import org.apache.giraph.block_app.test_setup.graphs.EachVertexInit;
+import org.apache.giraph.block_app.test_setup.graphs.Small1GraphInit;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.hadoop.io.IntWritable;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestCoarseningUtils {
+
+  public static class TestCoarseningUtilsBlockFactory extends AbstractBlockFactory<Object> {
+    @Override
+    public Block createBlock(GiraphConfiguration conf) {
+      return CoarseningUtils.<IntWritable, IntWritable, IntWritable>createCoarseningBlock(
+          (ImmutableClassesGiraphConfiguration<IntWritable, IntWritable, IntWritable>) conf,
+          ReusableSuppliers.fromInt((vertex) -> {
+            int id = vertex.getId().get();
+            if (id == 0 || id == 1) {
+              return -1;
+            } else if (id == 2 || id == 3) {
+              return -2;
+            } else if (id == 4 || id == 5) {
+              return -4;
+            } else return -id;
+          }),
+          id -> id.get() >= 0,
+          id -> id.get() < 0);
+    }
+
+    @Override
+    public Object createExecutionStage(GiraphConfiguration conf) {
+      return new Object();
+    }
+
+    @Override
+    protected Class<IntWritable> getVertexIDClass(GiraphConfiguration conf) {
+      return IntWritable.class;
+    }
+
+    @Override
+    protected Class<IntWritable> getVertexValueClass(GiraphConfiguration conf) {
+      return IntWritable.class;
+    }
+
+    @Override
+    protected Class<IntWritable> getEdgeValueClass(GiraphConfiguration conf) {
+      return IntWritable.class;
+    }
+  }
+
+  @Test
+  public void testSmallGraph() throws Exception {
+    /* We take small graph:
+     *
+     *   1      5
+     *  / \    / \    6
+     * 0---2--3---4
+     *
+     * And coarsen it into 4 groups: (0,1), (2,3), (4,5), (6)
+     */
+
+    TestGraphUtils.<IntWritable, IntWritable, IntWritable>runTest(
+        TestGraphUtils.chainModifiers(
+            new Small1GraphInit<>(() -> new IntWritable(1)),
+            new EachVertexInit<>((vertex) -> vertex.getValue().set(1))),
+        (graph) -> {
+          for (int i : new int[] {-1, -4}) {
+            Assert.assertEquals(2, graph.getValue(i).get());
+            Assert.assertEquals(2, graph.getVertex(i).getNumEdges());
+
+            Map<Integer, Integer> edges = edgesToMap(graph.getVertex(i).getEdges());
+            // self loop
+            Assert.assertEquals(2, edges.get(i).intValue());
+
+            Assert.assertEquals(2, edges.get(-2).intValue());
+          }
+          for (int i : new int[] {-2}) {
+            Assert.assertEquals(2, graph.getValue(i).get());
+            Assert.assertEquals(3, graph.getVertex(i).getNumEdges());
+
+            Map<Integer, Integer> edges = edgesToMap(graph.getVertex(i).getEdges());
+            // self loop
+            Assert.assertEquals(2, edges.get(i).intValue());
+
+            Assert.assertEquals(2, edges.get(-1).intValue());
+            Assert.assertEquals(2, edges.get(-4).intValue());
+          }
+          for (int i : new int[] {-6}) {
+            Assert.assertEquals(1, graph.getValue(i).get());
+            Assert.assertEquals(0, graph.getVertex(i).getNumEdges());
+          }
+        },
+        (conf) -> {
+          BlockUtils.setBlockFactoryClass(conf, TestCoarseningUtilsBlockFactory.class);
+        });
+  }
+
+  private static Map<Integer, Integer> edgesToMap(Iterable<Edge<IntWritable, IntWritable>> edges) {
+    Map<Integer, Integer> map = new HashMap<>();
+    for(Edge<IntWritable, IntWritable> edge : edges) {
+      Assert.assertNull(map.put(edge.getTargetVertexId().get(), edge.getValue().get()));
+    }
+    return map;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/writable/kryo/KryoWritableWrapperJava8Test.java
----------------------------------------------------------------------
diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/writable/kryo/KryoWritableWrapperJava8Test.java b/giraph-block-app-8/src/test/java/org/apache/giraph/writable/kryo/KryoWritableWrapperJava8Test.java
new file mode 100644
index 0000000..d8b4cc1
--- /dev/null
+++ b/giraph-block-app-8/src/test/java/org/apache/giraph/writable/kryo/KryoWritableWrapperJava8Test.java
@@ -0,0 +1,174 @@
+package org.apache.giraph.writable.kryo;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.piece.AbstractPiece;
+import org.apache.giraph.block_app.library.striping.StripingUtils;
+import org.apache.giraph.function.Consumer;
+import org.apache.giraph.function.ObjectTransfer;
+import org.apache.giraph.function.Predicate;
+import org.apache.giraph.function.Supplier;
+import org.apache.giraph.function.primitive.Int2ObjFunction;
+import org.apache.giraph.function.primitive.Obj2IntFunction;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class KryoWritableWrapperJava8Test {
+  // Copy from KryoWritableWrapperTest, since we cannot extend it, since tests are not in jars
+  public static <T> T kryoSerDeser(T t) throws IOException {
+    KryoWritableWrapper<T> wrapped = new KryoWritableWrapper<>(t);
+    KryoWritableWrapper<T> deser = new KryoWritableWrapper<>();
+    WritableUtils.copyInto(wrapped, deser, true);
+    return deser.get();
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testNonSerializableLambda() throws IOException {
+    Runnable nonCapturing = () -> System.out.println("works");
+    kryoSerDeser(nonCapturing).run();
+  }
+
+  @Test
+  public void testLambda() throws IOException {
+    Runnable nonCapturing = (Runnable & Serializable) () -> System.out.println("works");
+    kryoSerDeser(nonCapturing).run();
+
+    String works = "works";
+    Runnable capturing = (Runnable & Serializable) () -> System.out.println(works);
+    kryoSerDeser(capturing).run();
+  }
+
+  @Test
+  public void testLambdaCapturingThisRef() throws IOException {
+    KryoWritableWrapperJava8Test o = this;
+    Runnable capturingThisRef = (Runnable & Serializable) () ->
+      System.out.println(o);
+    kryoSerDeser(capturingThisRef).run();
+  }
+
+  @Test
+  public void testLambdaCapturingThis() throws IOException {
+    Runnable capturingThis = (Runnable & Serializable) () ->
+      System.out.println(this);
+    kryoSerDeser(capturingThis).run();
+  }
+
+  @Test
+  public void testLambdaCapturingLambda() throws IOException {
+    Supplier<Boolean> nonCapturing = () -> true;
+    Runnable capturingLambda = (Runnable & Serializable) () ->
+      System.out.println(nonCapturing);
+    kryoSerDeser(capturingLambda).run();
+  }
+
+  @Test
+  public void testLambdaCapturingLambdaWithCapture() throws IOException {
+    boolean trueVar = new Random().nextDouble() < 1;
+    Supplier<Boolean> capturing = () -> trueVar;
+    Runnable capturingLambda = (Runnable & Serializable) () ->
+      System.out.println(capturing + " " + trueVar);
+    kryoSerDeser(capturingLambda).run();
+  }
+
+
+  @Test
+  public void testLambdaFunctions() throws IOException {
+    Supplier<Boolean> nonCapturing = () -> true;
+    Assert.assertTrue(kryoSerDeser(nonCapturing).get());
+
+    boolean trueVar = new Random().nextDouble() < 1;
+    Supplier<Boolean> capturing = () -> trueVar;
+    Assert.assertTrue(kryoSerDeser(capturing).get());
+  }
+
+  @Test
+  public void testLambdasFromCode() throws IOException {
+    Assert.assertNotNull(kryoSerDeser(StripingUtils.fastHashStriping(3)));
+    Assert.assertNotNull(kryoSerDeser(StripingUtils.fastHashStripingPredicate(3)));
+
+    Assert.assertNotNull(kryoSerDeser(
+        (Int2ObjFunction<Obj2IntFunction<LongWritable>>) StripingUtils::fastHashStriping));
+
+    Int2ObjFunction<Int2ObjFunction<Predicate<LongWritable>>> stripingPredicate = StripingUtils::fastHashStripingPredicate;
+    Assert.assertNotNull(kryoSerDeser(stripingPredicate));
+
+    Assert.assertNotNull(kryoSerDeser(stripingPredicate.apply(3)));
+
+    Assert.assertNotNull(kryoSerDeser(StripingUtils.fastHashStripingPredicate(3).apply(2)));
+
+    Assert.assertNotNull(kryoSerDeser(stripingPredicate.apply(3).apply(2)));
+
+    Predicate<LongWritable> predicate = stripingPredicate.apply(3).apply(2);
+    Assert.assertNotNull(kryoSerDeser(predicate));
+
+    Runnable capturingLambda = (Runnable & Serializable) () ->
+      System.out.println(predicate.apply(new LongWritable()));
+    kryoSerDeser(capturingLambda).run();
+
+    Assert.assertNotNull(kryoSerDeser(
+      StripingUtils.generateStripedBlock(
+          5,
+          (filter) -> new Block() {
+            private final Predicate<LongWritable> test = filter;
+            {
+              Assert.assertTrue(filter instanceof Serializable);
+            }
+
+            @Override
+            public Iterator<AbstractPiece> iterator() {
+              return null;
+            }
+
+            @Override
+            public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) { }
+          })));
+  }
+
+  @Test
+  public void testLambdaCapturingSameReference() throws IOException {
+    ObjectTransfer<Integer> transfer = new ObjectTransfer<>();
+
+    Consumer<Integer> consumer = (t) -> transfer.apply(t);
+    Supplier<Integer> supplier = () -> transfer.get();
+
+    class TwoLambdaObject {
+      Consumer<Integer> consumer;
+      Supplier<Integer> supplier;
+
+      public TwoLambdaObject(Consumer<Integer> consumer, Supplier<Integer> supplier) {
+        this.consumer = consumer;
+        this.supplier = supplier;
+      }
+    }
+
+    TwoLambdaObject object = new TwoLambdaObject(consumer, supplier);
+    // test transfer before serialization
+    object.consumer.apply(5);
+    Assert.assertEquals(5, object.supplier.get().intValue());
+
+    // test transfer through serialization
+    object.consumer.apply(6);
+    TwoLambdaObject deser = kryoSerDeser(object);
+    Assert.assertEquals(6, deser.supplier.get().intValue());
+
+    // test that after serialization, both lambdas point to the same object
+    deser.consumer.apply(4);
+    Assert.assertEquals(4, deser.supplier.get().intValue());
+  }
+
+  // Bug in Java, have test to know when it becomes fixed
+  @Test //(expected=RuntimeException.class)
+  public void testNestedLambda() throws IOException {
+    Int2ObjFunction<Int2ObjFunction<Integer>> f = (x) -> (y) -> x+y;
+    Assert.assertNotNull(kryoSerDeser(f));
+    Assert.assertNotNull(kryoSerDeser(f.apply(0)));
+    Assert.assertNotNull(kryoSerDeser(f.apply(0).apply(1)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java
index ad6ca15..d582cb2 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java
@@ -144,6 +144,8 @@ public class LocalBlockRunner {
       Block block, Object executionStage, TestGraph<I, V, E> graph,
       final VertexSaver<I, V, E> vertexSaver
   ) {
+    Preconditions.checkNotNull(block);
+    Preconditions.checkNotNull(graph);
     ImmutableClassesGiraphConfiguration<I, V, E> conf = graph.getConf();
     int numWorkers = NUM_THREADS.get(conf);
     boolean runAllChecks = RUN_ALL_CHECKS.get(conf);

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java
index 88b78a3..d926cdd 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java
@@ -32,6 +32,7 @@ import org.apache.giraph.block_app.library.internal.SendMessagePiece;
 import org.apache.giraph.block_app.library.internal.SendMessageWithCombinerPiece;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.function.Consumer;
+import org.apache.giraph.function.PairConsumer;
 import org.apache.giraph.function.vertex.ConsumerWithVertex;
 import org.apache.giraph.function.vertex.SupplierFromVertex;
 import org.apache.giraph.graph.Vertex;
@@ -170,10 +171,38 @@ public class Pieces {
   <S, R extends Writable, I extends WritableComparable, V extends Writable,
   E extends Writable>
   Piece<I, V, E, NoMessage, Object> reduce(
+      String name,
+      ReduceOperation<S, R> reduceOp,
+      SupplierFromVertex<I, V, E, S> valueSupplier,
+      final Consumer<R> reducedValueConsumer) {
+    return reduceWithMaster(
+        name, reduceOp, valueSupplier,
+        new PairConsumer<R, BlockMasterApi>() {
+          @Override
+          public void apply(R input, BlockMasterApi master) {
+            reducedValueConsumer.apply(input);
+          }
+        });
+  }
+
+  /**
+   * Creates single reducer piece - given reduce class, supplier of values on
+   * worker, reduces and passes the result to given consumer on master.
+   *
+   * @param <S> Single value type, objects passed on workers
+   * @param <R> Reduced value type
+   * @param <I> Vertex id type
+   * @param <V> Vertex value type
+   * @param <E> Edge value type
+   */
+  public static
+  <S, R extends Writable, I extends WritableComparable, V extends Writable,
+  E extends Writable>
+  Piece<I, V, E, NoMessage, Object> reduceWithMaster(
       final String name,
       final ReduceOperation<S, R> reduceOp,
       final SupplierFromVertex<I, V, E, S> valueSupplier,
-      final Consumer<R> reducedValueConsumer) {
+      final PairConsumer<R, BlockMasterApi> reducedValueConsumer) {
     return new Piece<I, V, E, NoMessage, Object>() {
       private ReducerHandle<S, R> handle;
 
@@ -197,7 +226,7 @@ public class Pieces {
 
       @Override
       public void masterCompute(BlockMasterApi master, Object executionStage) {
-        reducedValueConsumer.apply(handle.getReducedValue(master));
+        reducedValueConsumer.apply(handle.getReducedValue(master), master);
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/SendMessageChain.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/SendMessageChain.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/SendMessageChain.java
index b606a34..d1efd5b 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/SendMessageChain.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/SendMessageChain.java
@@ -19,12 +19,14 @@ package org.apache.giraph.block_app.library;
 
 import java.util.Iterator;
 
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
 import org.apache.giraph.block_app.framework.block.Block;
 import org.apache.giraph.block_app.framework.block.SequenceBlock;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.function.Consumer;
 import org.apache.giraph.function.Function;
 import org.apache.giraph.function.ObjectTransfer;
+import org.apache.giraph.function.PairConsumer;
 import org.apache.giraph.function.vertex.ConsumerWithVertex;
 import org.apache.giraph.function.vertex.FunctionWithVertex;
 import org.apache.giraph.function.vertex.SupplierFromVertex;
@@ -145,6 +147,18 @@ public class SendMessageChain<I extends WritableComparable, V extends Writable,
   }
 
   /**
+   * Start chain by providing a function that will produce Block representing
+   * beginning of the chain, given a consumer of messages send
+   * by the last link in the created block.
+   */
+  public static <I extends WritableComparable, V extends Writable,
+  E extends Writable, P extends Writable>
+  SendMessageChain<I, V, E, P> startCustom(
+      Function<ConsumerWithVertex<I, V, E, P>, Block> createStartingBlock) {
+    return new SendMessageChain<>(createStartingBlock);
+  }
+
+  /**
    * Give previously received message(s) to messageSupplier, and send message
    * it returns to all targets provided by targetsSupplier.
    */
@@ -245,23 +259,51 @@ public class SendMessageChain<I extends WritableComparable, V extends Writable,
    * by reducedValueConsumer.
    */
   public <S, R extends Writable>
-  Block endReduce(String name, ReduceOperation<S, R> reduceOp,
+  Block endReduce(final String name, final ReduceOperation<S, R> reduceOp,
       final FunctionWithVertex<I, V, E, P, S> valueSupplier,
-      Consumer<R> reducedValueConsumer) {
-    final ObjectTransfer<P> prevMessagesTransfer = new ObjectTransfer<>();
+      final Consumer<R> reducedValueConsumer) {
+    return endCustom(new Function<SupplierFromVertex<I, V, E, P>, Block>() {
+      @Override
+      public Block apply(final SupplierFromVertex<I, V, E, P> prevMessages) {
+        return Pieces.reduce(
+            name,
+            reduceOp,
+            new SupplierFromVertex<I, V, E, S>() {
+              @Override
+              public S get(Vertex<I, V, E> vertex) {
+                return valueSupplier.apply(vertex, prevMessages.get(vertex));
+              }
+            },
+            reducedValueConsumer);
+      }
+    });
+  }
 
-    return new SequenceBlock(
-      blockCreator.apply(prevMessagesTransfer.<I, V, E>castToConsumer()),
-      Pieces.reduce(
-        name,
-        reduceOp,
-        new SupplierFromVertex<I, V, E, S>() {
-          @Override
-          public S get(Vertex<I, V, E> vertex) {
-            return valueSupplier.apply(vertex, prevMessagesTransfer.get());
-          }
-        },
-        reducedValueConsumer));
+  /**
+   * End chain by giving received messages to valueSupplier,
+   * to produce value that should be reduced, and consumed on master
+   * by reducedValueConsumer.
+   */
+  public <S, R extends Writable>
+  Block endReduceWithMaster(
+      final String name, final ReduceOperation<S, R> reduceOp,
+      final FunctionWithVertex<I, V, E, P, S> valueSupplier,
+      final PairConsumer<R, BlockMasterApi> reducedValueConsumer) {
+    return endCustom(new Function<SupplierFromVertex<I, V, E, P>, Block>() {
+      @Override
+      public Block apply(final SupplierFromVertex<I, V, E, P> prevMessages) {
+        return Pieces.reduceWithMaster(
+            name,
+            reduceOp,
+            new SupplierFromVertex<I, V, E, S>() {
+              @Override
+              public S get(Vertex<I, V, E> vertex) {
+                return valueSupplier.apply(vertex, prevMessages.get(vertex));
+              }
+            },
+            reducedValueConsumer);
+      }
+    });
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/gc/WorkerGCPiece.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/gc/WorkerGCPiece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/gc/WorkerGCPiece.java
new file mode 100644
index 0000000..83ef3fd
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/gc/WorkerGCPiece.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.gc;
+
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
+import org.apache.giraph.block_app.framework.piece.PieceWithWorkerContext;
+import org.apache.giraph.types.NoMessage;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+/**
+ * Dummy piece to hint System.gc()
+ */
+public class WorkerGCPiece extends PieceWithWorkerContext<WritableComparable,
+    Writable, Writable, NoMessage, Object, NoMessage, Object>  {
+
+  @Override
+  @SuppressFBWarnings(value = "DM_GC")
+  public void workerContextSend(
+      BlockWorkerContextSendApi<NoMessage> workerContextApi,
+      Object executionStage,
+      Object workerValue) {
+    System.gc();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/gc/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/gc/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/gc/package-info.java
new file mode 100644
index 0000000..bdf5ded
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/gc/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * GC utility pieces.
+ */
+package org.apache.giraph.block_app.library.gc;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Double2ObjFunction.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Double2ObjFunction.java b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Double2ObjFunction.java
new file mode 100644
index 0000000..8b3c0a3
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Double2ObjFunction.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.function.primitive;
+
+import java.io.Serializable;
+
+/**
+ * Primitive specialization of Function:
+ * (double) -> T
+ *
+ * @param <T> Result type
+ */
+public interface Double2ObjFunction<T> extends Serializable {
+  /**
+   * Returns the result of applying this function to given {@code input}.
+   *
+   * The returned object may or may not be a new instance,
+   * depending on the implementation.
+   */
+  T apply(double input);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/DoubleConsumer.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/DoubleConsumer.java b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/DoubleConsumer.java
new file mode 100644
index 0000000..4725d4b
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/DoubleConsumer.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.function.primitive;
+
+import java.io.Serializable;
+
+
+/**
+ * Primitive specialization of Function:
+ * (double) -> void
+ */
+public interface DoubleConsumer extends Serializable {
+  /**
+   * Applies this function to {@code input}
+   */
+  void apply(double input);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-core/src/main/java/org/apache/giraph/combiner/MaxMessageCombiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/MaxMessageCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/MaxMessageCombiner.java
new file mode 100644
index 0000000..5a7c101
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/combiner/MaxMessageCombiner.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.combiner;
+
+import org.apache.giraph.types.ops.DoubleTypeOps;
+import org.apache.giraph.types.ops.FloatTypeOps;
+import org.apache.giraph.types.ops.IntTypeOps;
+import org.apache.giraph.types.ops.LongTypeOps;
+import org.apache.giraph.types.ops.NumericTypeOps;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Message combiner which calculates max of all messages.
+ *
+ * @param <M> Message type
+ */
+public class MaxMessageCombiner<M extends WritableComparable>
+    implements MessageCombiner<WritableComparable, M> {
+  /** DoubleWritable specialization */
+  public static final MaxMessageCombiner<DoubleWritable> DOUBLE =
+      new MaxMessageCombiner<>(DoubleTypeOps.INSTANCE);
+  /** DoubleWritable specialization */
+  public static final MaxMessageCombiner<FloatWritable> FLOAT =
+      new MaxMessageCombiner<>(FloatTypeOps.INSTANCE);
+  /** LongWritable specialization */
+  public static final MaxMessageCombiner<LongWritable> LONG =
+      new MaxMessageCombiner<>(LongTypeOps.INSTANCE);
+  /** IntWritable specialization */
+  public static final MaxMessageCombiner<IntWritable> INT =
+      new MaxMessageCombiner<>(IntTypeOps.INSTANCE);
+
+  /** Value type operations */
+  private final NumericTypeOps<M> typeOps;
+
+  /**
+   * Constructor
+   * @param typeOps Value type operations
+   */
+  public MaxMessageCombiner(NumericTypeOps<M> typeOps) {
+    this.typeOps = typeOps;
+  }
+
+  @Override
+  public void combine(
+      WritableComparable vertexIndex, M originalMessage, M messageToCombine) {
+    if (originalMessage.compareTo(messageToCombine) < 0) {
+      typeOps.set(originalMessage, messageToCombine);
+    }
+  }
+
+  @Override
+  public M createInitialMessage() {
+    return typeOps.createZero();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-core/src/main/java/org/apache/giraph/reducers/impl/PairReduce.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/impl/PairReduce.java b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/PairReduce.java
new file mode 100644
index 0000000..1aea293
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/PairReduce.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.reducers.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.writable.tuple.PairWritable;
+import org.apache.hadoop.io.Writable;
+import org.python.google.common.base.Preconditions;
+
+/**
+ * Combines two individual reducers, to create a single reducer of pairs that
+ * reduces each of them individually.
+ *
+ * @param <S1> First single value type
+ * @param <R1> First reduced value type
+ * @param <S2> Second single value type
+ * @param <R2> Second reduced value type
+ */
+public class PairReduce<S1, R1 extends Writable, S2, R2 extends Writable>
+    implements ReduceOperation<Pair<S1, S2>, PairWritable<R1, R2>> {
+  /** First reduceOp */
+  private ReduceOperation<S1, R1> reduce1;
+  /** Second reduceOp */
+  private ReduceOperation<S2, R2> reduce2;
+
+  /** Constructor */
+  public PairReduce() {
+  }
+
+  /**
+   * Constructor
+   * @param reduce1 First reduceOp
+   * @param reduce2 Second reduceOp
+   */
+  public PairReduce(
+      ReduceOperation<S1, R1> reduce1, ReduceOperation<S2, R2> reduce2) {
+    this.reduce1 = reduce1;
+    this.reduce2 = reduce2;
+  }
+
+
+  @Override
+  public PairWritable<R1, R2> createInitialValue() {
+    return new PairWritable<>(
+        reduce1.createInitialValue(), reduce2.createInitialValue());
+  }
+
+  @Override
+  public PairWritable<R1, R2> reduce(
+      PairWritable<R1, R2> curValue, Pair<S1, S2> valueToReduce) {
+    Preconditions.checkState(
+        curValue.getLeft() ==
+        reduce1.reduce(curValue.getLeft(), valueToReduce.getLeft()));
+    Preconditions.checkState(
+        curValue.getRight() ==
+        reduce2.reduce(curValue.getRight(), valueToReduce.getRight()));
+    return curValue;
+  }
+
+  @Override
+  public PairWritable<R1, R2> reduceMerge(
+      PairWritable<R1, R2> curValue, PairWritable<R1, R2> valueToReduce) {
+    Preconditions.checkState(
+        curValue.getLeft() ==
+        reduce1.reduceMerge(curValue.getLeft(), valueToReduce.getLeft()));
+    Preconditions.checkState(
+        curValue.getRight() ==
+        reduce2.reduceMerge(curValue.getRight(), valueToReduce.getRight()));
+    return curValue;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeWritableObject(reduce1, out);
+    WritableUtils.writeWritableObject(reduce2, out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    reduce1 = WritableUtils.readWritableObject(in, null);
+    reduce2 = WritableUtils.readWritableObject(in, null);
+  }
+}


Mime
View raw message