giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ikabi...@apache.org
Subject git commit: updated refs/heads/trunk to 8cb309d
Date Fri, 27 Jan 2017 19:34:25 GMT
Repository: giraph
Updated Branches:
  refs/heads/trunk e61cbfddb -> 8cb309da8


GIRAPH-1130 Fix RepeatUntilBlock

closes #16


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/8cb309da
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/8cb309da
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/8cb309da

Branch: refs/heads/trunk
Commit: 8cb309da868afdc03f0dbddbdd18bbc0a6a1eb90
Parents: e61cbfd
Author: Igor Kabiljo <ikabiljo@fb.com>
Authored: Fri Jan 27 11:34:06 2017 -0800
Committer: Igor Kabiljo <ikabiljo@fb.com>
Committed: Fri Jan 27 11:34:06 2017 -0800

----------------------------------------------------------------------
 .../framework/block/RepeatUntilBlock.java       | 20 ++++++------
 .../apache/giraph/block_app/library/Pieces.java | 17 +++++++++++
 .../block_app/framework/BlockExecutionTest.java | 32 ++++++++++++++++++++
 .../framework/block/BlockTestingUtils.java      | 13 +++++---
 .../framework/block/TestRepeatUntilBlock.java   | 30 ++++++++++++++++++
 5 files changed, 96 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/8cb309da/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatUntilBlock.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatUntilBlock.java
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatUntilBlock.java
index 13e8833..ba76307 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatUntilBlock.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatUntilBlock.java
@@ -17,7 +17,6 @@
  */
 package org.apache.giraph.block_app.framework.block;
 
-import java.util.Collections;
 import java.util.Iterator;
 
 import org.apache.giraph.block_app.framework.piece.AbstractPiece;
@@ -25,7 +24,7 @@ import org.apache.giraph.function.Consumer;
 import org.apache.giraph.function.Supplier;
 
 import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
 
 /**
  * Block that repeats another block until toQuit supplier returns true,
@@ -56,19 +55,18 @@ public final class RepeatUntilBlock implements Block {
 
   @Override
   public Iterator<AbstractPiece> iterator() {
-    // nCopies uses constant memory, creating a looped list with single element
-    final Iterator<AbstractPiece> repeatIterator =
-        Iterables.concat(Collections.nCopies(repeatTimes, block)).iterator();
-    return new AbstractIterator<AbstractPiece>() {
+    return Iterators.concat(new AbstractIterator<Iterator<AbstractPiece>>() {
+      private int index = 0;
+
       @Override
-      protected AbstractPiece computeNext() {
-        if (Boolean.TRUE.equals(toQuit.get()) || !repeatIterator.hasNext()) {
+      protected Iterator<AbstractPiece> computeNext() {
+        if (index >= repeatTimes || Boolean.TRUE.equals(toQuit.get())) {
           return endOfData();
         }
-
-        return repeatIterator.next();
+        index++;
+        return block.iterator();
       }
-    };
+    });
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/8cb309da/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java
index d926cdd..614f4ba 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java
@@ -82,6 +82,23 @@ public class Pieces {
   }
 
   /**
+   * Execute given function on master.
+   */
+  public static
+  Piece<WritableComparable, Writable,  Writable, NoMessage,
+    Object> masterCompute(
+      final String pieceName, final Consumer<BlockMasterApi> process) {
+    return new Piece<WritableComparable, Writable,  Writable, NoMessage,
+        Object>() {
+      @Override
+      public void masterCompute(
+          BlockMasterApi masterApi, Object executionStage) {
+        process.apply(masterApi);
+      }
+    };
+  }
+
+  /**
    * For each vertex execute given process function.
    * Computation is happening in the receive phase of the returned Piece.
    * This function should be used if you need returned Piece to interact with

http://git-wip-us.apache.org/repos/asf/giraph/blob/8cb309da/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockExecutionTest.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockExecutionTest.java
b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockExecutionTest.java
index 021a24c..00c5fb1 100644
--- a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockExecutionTest.java
+++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockExecutionTest.java
@@ -22,12 +22,19 @@ import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
 import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
 import org.apache.giraph.block_app.framework.api.CreateReducersApi;
 import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.block.RepeatUntilBlock;
+import org.apache.giraph.block_app.framework.block.SequenceBlock;
 import org.apache.giraph.block_app.framework.piece.Piece;
 import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
 import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
 import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
+import org.apache.giraph.block_app.library.Pieces;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.function.Consumer;
+import org.apache.giraph.function.ObjectTransfer;
+import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.reducers.impl.SumReduce;
 import org.apache.giraph.types.NoMessage;
@@ -175,4 +182,29 @@ public class BlockExecutionTest {
     Assert.assertNull(graph.getVertex(new LongWritable(3)));
     Assert.assertNotNull(graph.getVertex(new LongWritable(4)));
   }
+
+  @Test
+  public void testRepeatUntilBlockFinishCurrentLoop() throws Exception {
+    final ObjectTransfer<Boolean> toQuit = new ObjectTransfer<>();
+    final IntRef counter = new IntRef(5);
+    Block counterPiece = Pieces.masterCompute("Count", new Consumer<BlockMasterApi>()
{
+      @Override
+      public void apply(BlockMasterApi input) {
+        counter.value--;
+        if (counter.value == 0) {
+          toQuit.apply(true);
+        }
+      }
+    });
+    Block innerBlock = new SequenceBlock(counterPiece, counterPiece, counterPiece, counterPiece);
+    Block repeatBlock = RepeatUntilBlock.unlimited(
+      innerBlock,
+      toQuit
+    );
+
+    LocalBlockRunner.runBlock(createTestGraph(), repeatBlock, new Object());
+
+    Assert.assertEquals(-3, counter.value);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8cb309da/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/BlockTestingUtils.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/BlockTestingUtils.java
b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/BlockTestingUtils.java
index 6487d95..6384cfa 100644
--- a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/BlockTestingUtils.java
+++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/BlockTestingUtils.java
@@ -18,7 +18,6 @@
 package org.apache.giraph.block_app.framework.block;
 
 import static org.junit.Assert.assertEquals;
-import it.unimi.dsi.fastutil.ints.IntArrayList;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -30,6 +29,8 @@ import org.apache.giraph.block_app.framework.piece.AbstractPiece;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+
 @SuppressWarnings({"unchecked", "rawtypes"})
 public class BlockTestingUtils {
 
@@ -38,11 +39,13 @@ public class BlockTestingUtils {
   private static final int NUM_TRIALS = 10;
   private static final int REPEAT_TIMES = 10;
 
-  private static int testSequential(Iterator<? extends AbstractPiece> referenceImpl,
-                                    Iterator<? extends AbstractPiece> testImpl) {
+  public static int testSequential(
+      Iterable<? extends AbstractPiece> referenceImpl,
+      Iterable<? extends AbstractPiece> testImpl) {
     int length = 0;
 
-    CheckIterator checkIterator = new CheckIterator(referenceImpl, testImpl);
+    CheckIterator checkIterator = new CheckIterator(
+        referenceImpl.iterator(), testImpl.iterator());
     while (checkIterator.hasNext()) {
       checkIterator.next();
       length++;
@@ -99,7 +102,7 @@ public class BlockTestingUtils {
    */
   public static void testIndependence(Iterable<? extends AbstractPiece> referenceImpl,
                                       Iterable<? extends AbstractPiece> testImpl) {
-    int length = testSequential(referenceImpl.iterator(), testImpl.iterator());
+    int length = testSequential(referenceImpl, testImpl);
     testRandom(length, referenceImpl, testImpl);
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8cb309da/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatUntilBlock.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatUntilBlock.java
b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatUntilBlock.java
index 242d376..f69e4ac 100644
--- a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatUntilBlock.java
+++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatUntilBlock.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
 import org.apache.giraph.block_app.framework.piece.AbstractPiece;
 import org.apache.giraph.block_app.framework.piece.Piece;
 import org.apache.giraph.function.Supplier;
+import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef;
 import org.junit.Test;
 
 import com.google.common.collect.Iterables;
@@ -60,6 +61,35 @@ public class TestRepeatUntilBlock {
   }
 
   @Test
+  public void testRepeatUntilBlockBasicExit() throws Exception {
+    testRepeatUntilBlockBasicExit(1, 1);
+    testRepeatUntilBlockBasicExit(1, 3);
+    testRepeatUntilBlockBasicExit(3, 1);
+    testRepeatUntilBlockBasicExit(3, 2);
+    testRepeatUntilBlockBasicExit(4, 7);
+  }
+
+  private void testRepeatUntilBlockBasicExit(int inner, int outer) {
+    final IntRef counter = new IntRef(outer + 1);
+    Supplier<Boolean> countDown = new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        counter.value--;
+        return counter.value == 0;
+      }
+    };
+    Piece piece = new Piece();
+    Block repeatBlock = new RepeatUntilBlock(
+      outer + 4,
+      new RepeatBlock(inner, piece),
+      countDown
+    );
+    BlockTestingUtils.testSequential(
+      Iterables.concat(Collections.nCopies(outer * inner, piece)),
+      repeatBlock);
+  }
+
+  @Test
   public void testNestedRepeatUntilBlock() throws Exception {
     Piece piece1 = new Piece();
     Piece piece2 = new Piece();


Mime
View raw message