giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject git commit: updated refs/heads/trunk to d4596a0
Date Wed, 10 May 2017 14:32:01 GMT
Repository: giraph
Updated Branches:
  refs/heads/trunk 4f9c6c24a -> d4596a029


JIRA-1146

closes #36


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/d4596a02
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/d4596a02
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/d4596a02

Branch: refs/heads/trunk
Commit: d4596a029984865ace68fab981047bed1f1d1494
Parents: 4f9c6c2
Author: Maja Kabiljo <majakabiljo@fb.com>
Authored: Wed May 10 07:31:15 2017 -0700
Committer: Maja Kabiljo <majakabiljo@fb.com>
Committed: Wed May 10 07:31:15 2017 -0700

----------------------------------------------------------------------
 .../kryo/KryoWritableWrapperJava8Test.java      |  6 ++
 .../giraph/block_app/framework/BlockUtils.java  |  6 ++
 .../giraph/block_app/framework/block/Block.java |  8 ++
 .../block_app/framework/block/EmptyBlock.java   |  5 ++
 .../framework/block/FilteringBlock.java         |  5 ++
 .../block_app/framework/block/IfBlock.java      |  8 ++
 .../block_app/framework/block/PieceCount.java   | 91 ++++++++++++++++++++
 .../block_app/framework/block/RepeatBlock.java  | 10 +++
 .../framework/block/RepeatUntilBlock.java       |  5 ++
 .../framework/block/SequenceBlock.java          |  9 ++
 .../framework/piece/AbstractPiece.java          |  6 ++
 .../framework/piece/delegate/DelegatePiece.java |  6 ++
 .../migration/MigrationFullBlockFactory.java    |  6 ++
 .../block_app/framework/BlockApiHandleTest.java |  6 ++
 .../block_app/framework/block/TestIfBlock.java  | 16 ++++
 .../framework/block/TestRepeatBlock.java        |  2 +
 .../framework/block/TestRepeatUntilBlock.java   |  3 +
 .../org/apache/giraph/conf/GiraphConstants.java |  4 +
 .../giraph/job/CombinedWorkerProgress.java      | 12 ++-
 19 files changed, 212 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/d4596a02/giraph-block-app-8/src/test/java/org/apache/giraph/writable/kryo/KryoWritableWrapperJava8Test.java
----------------------------------------------------------------------
diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/writable/kryo/KryoWritableWrapperJava8Test.java
b/giraph-block-app-8/src/test/java/org/apache/giraph/writable/kryo/KryoWritableWrapperJava8Test.java
index 1f528f0..3c706c9 100644
--- a/giraph-block-app-8/src/test/java/org/apache/giraph/writable/kryo/KryoWritableWrapperJava8Test.java
+++ b/giraph-block-app-8/src/test/java/org/apache/giraph/writable/kryo/KryoWritableWrapperJava8Test.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 import java.util.Random;
 
 import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.block.PieceCount;
 import org.apache.giraph.block_app.framework.piece.AbstractPiece;
 import org.apache.giraph.block_app.library.striping.StripingUtils;
 import org.apache.giraph.function.Consumer;
@@ -145,6 +146,11 @@ public class KryoWritableWrapperJava8Test {
 
             @Override
             public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) { }
+
+            @Override
+            public PieceCount getPieceCount() {
+              return PieceCount.createUnknownCount();
+            }
           })));
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/d4596a02/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java
index 49f23c5..1906eeb 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java
@@ -26,6 +26,7 @@ import org.apache.giraph.block_app.framework.api.giraph.BlockComputation;
 import org.apache.giraph.block_app.framework.api.giraph.BlockMasterCompute;
 import org.apache.giraph.block_app.framework.api.giraph.BlockWorkerContext;
 import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.block.PieceCount;
 import org.apache.giraph.block_app.framework.piece.AbstractPiece;
 import org.apache.giraph.block_app.framework.piece.Piece;
 import org.apache.giraph.conf.BooleanConfOption;
@@ -147,6 +148,11 @@ public class BlockUtils {
     checkBlockTypes(
         executionBlock, blockFactory.createExecutionStage(immConf), immConf);
 
+    PieceCount pieceCount = executionBlock.getPieceCount();
+    if (pieceCount.isKnown()) {
+      GiraphConstants.SUPERSTEP_COUNT.set(conf, pieceCount.getCount() + 1);
+    }
+
     // check for non 'static final' fields in BlockFactories
     Class<?> bfClass = blockFactory.getClass();
     while (!bfClass.equals(Object.class)) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/d4596a02/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/Block.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/Block.java
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/Block.java
index 6d5287c..fd1ec70 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/Block.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/Block.java
@@ -56,4 +56,12 @@ public interface Block extends Iterable<AbstractPiece> {
    * without actually executing them.
    */
   void forAllPossiblePieces(Consumer<AbstractPiece> consumer);
+
+  /**
+   * How many pieces are in this block.
+   * Sometimes we don't know (eg RepeatBlock).
+   *
+   * @return How many pieces are in this block.
+   */
+  PieceCount getPieceCount();
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d4596a02/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/EmptyBlock.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/EmptyBlock.java
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/EmptyBlock.java
index 1a57402..0701564 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/EmptyBlock.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/EmptyBlock.java
@@ -36,4 +36,9 @@ public final class EmptyBlock implements Block {
   @Override
   public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
   }
+
+  @Override
+  public PieceCount getPieceCount() {
+    return new PieceCount(0);
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d4596a02/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/FilteringBlock.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/FilteringBlock.java
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/FilteringBlock.java
index 5631417..2cea9cb 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/FilteringBlock.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/FilteringBlock.java
@@ -110,4 +110,9 @@ public final class FilteringBlock<I extends WritableComparable,
   public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
     block.forAllPossiblePieces(consumer);
   }
+
+  @Override
+  public PieceCount getPieceCount() {
+    return block.getPieceCount();
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d4596a02/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/IfBlock.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/IfBlock.java
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/IfBlock.java
index e73392d..a72d343 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/IfBlock.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/IfBlock.java
@@ -61,6 +61,14 @@ public final class IfBlock implements Block {
   }
 
   @Override
+  public PieceCount getPieceCount() {
+    PieceCount thenCount = thenBlock.getPieceCount();
+    PieceCount elseCount = elseBlock.getPieceCount();
+    return thenCount.equals(elseCount) ?
+        thenCount : PieceCount.createUnknownCount();
+  }
+
+  @Override
   public String toString() {
     if (elseBlock instanceof EmptyBlock) {
       return "IfBlock(" + thenBlock + ")";

http://git-wip-us.apache.org/repos/asf/giraph/blob/d4596a02/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/PieceCount.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/PieceCount.java
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/PieceCount.java
new file mode 100644
index 0000000..083e1aa
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/PieceCount.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.block_app.framework.block;
+
+import com.google.common.base.Objects;
+
+/**
+ * Number of pieces
+ */
+public class PieceCount {
+  private boolean known;
+  private int count;
+
+  public PieceCount(int count) {
+    known = true;
+    this.count = count;
+  }
+
+  private PieceCount() {
+    known = false;
+  }
+
+  public static PieceCount createUnknownCount() {
+    return new PieceCount();
+  }
+
+
+  public PieceCount add(PieceCount other) {
+    if (!this.known || !other.known) {
+      known = false;
+    } else {
+      count += other.count;
+    }
+    return this;
+  }
+
+  public PieceCount multiply(int value) {
+    count *= value;
+    return this;
+  }
+
+  public int getCount() {
+    if (known) {
+      return count;
+    } else {
+      throw new IllegalStateException(
+          "Can't get superstep count when it's unknown");
+    }
+  }
+
+  public boolean isKnown() {
+    return known;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj instanceof PieceCount) {
+      PieceCount other = (PieceCount) obj;
+      if (known) {
+        return other.known && other.count == count;
+      } else {
+        return !other.known;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(known, count);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d4596a02/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatBlock.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatBlock.java
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatBlock.java
index 9f4f4a0..40d5bfe 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatBlock.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatBlock.java
@@ -32,10 +32,12 @@ import com.google.common.collect.Iterables;
 @SuppressWarnings("rawtypes")
 public final class RepeatBlock implements Block {
   private final Block block;
+  private final boolean constantRepeatTimes;
   private final IntSupplier repeatTimes;
 
   public RepeatBlock(final int repeatTimes, Block block) {
     this.block = block;
+    this.constantRepeatTimes = true;
     this.repeatTimes = new IntSupplier() {
       @Override
       public int get() {
@@ -56,6 +58,7 @@ public final class RepeatBlock implements Block {
    */
   public RepeatBlock(IntSupplier repeatTimes, Block block) {
     this.block = block;
+    this.constantRepeatTimes = false;
     this.repeatTimes = repeatTimes;
   }
 
@@ -81,6 +84,13 @@ public final class RepeatBlock implements Block {
   }
 
   @Override
+  public PieceCount getPieceCount() {
+    return constantRepeatTimes ?
+        block.getPieceCount().multiply(repeatTimes.get()) :
+        PieceCount.createUnknownCount();
+  }
+
+  @Override
   public String toString() {
     return "RepeatBlock(" + repeatTimes + " * " + block + ")";
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d4596a02/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatUntilBlock.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatUntilBlock.java
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatUntilBlock.java
index ba76307..99d38e8 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatUntilBlock.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatUntilBlock.java
@@ -75,6 +75,11 @@ public final class RepeatUntilBlock implements Block {
   }
 
   @Override
+  public PieceCount getPieceCount() {
+    return PieceCount.createUnknownCount();
+  }
+
+  @Override
   public String toString() {
     return "RepeatUntilBlock(" + repeatTimes + " * " + block + ")";
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d4596a02/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/SequenceBlock.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/SequenceBlock.java
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/SequenceBlock.java
index d768f0b..6268e08 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/SequenceBlock.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/SequenceBlock.java
@@ -54,6 +54,15 @@ public final class SequenceBlock implements Block {
   }
 
   @Override
+  public PieceCount getPieceCount() {
+    PieceCount ret = new PieceCount(0);
+    for (Block block : blocks) {
+      ret.add(block.getPieceCount());
+    }
+    return ret;
+  }
+
+  @Override
   public String toString() {
     return "SequenceBlock" + Arrays.toString(blocks);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d4596a02/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java
index f4e3678..17a1970 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java
@@ -26,6 +26,7 @@ import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
 import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
 import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
 import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.block.PieceCount;
 import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
 import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
 import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
@@ -263,6 +264,11 @@ public abstract class AbstractPiece<I extends WritableComparable,
   }
 
   @Override
+  public PieceCount getPieceCount() {
+    return new PieceCount(1);
+  }
+
+  @Override
   public String toString() {
     String name = getClass().getSimpleName();
     if (name.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/d4596a02/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java
index 095ea74..ef27818 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java
@@ -26,6 +26,7 @@ import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
 import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
 import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
 import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.block.PieceCount;
 import org.apache.giraph.block_app.framework.piece.AbstractPiece;
 import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
 import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
@@ -249,6 +250,11 @@ public class DelegatePiece<I extends WritableComparable, V extends
Writable,
     }
   }
 
+  @Override
+  public PieceCount getPieceCount() {
+    return new PieceCount(1);
+  }
+
   @SuppressWarnings("deprecation")
   @Override
   public void registerAggregators(BlockMasterApi master)

http://git-wip-us.apache.org/repos/asf/giraph/blob/d4596a02/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationFullBlockFactory.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationFullBlockFactory.java
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationFullBlockFactory.java
index 3a8c9e9..923679f 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationFullBlockFactory.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationFullBlockFactory.java
@@ -21,6 +21,7 @@ import java.util.Iterator;
 
 import org.apache.giraph.block_app.framework.AbstractBlockFactory;
 import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.block.PieceCount;
 import org.apache.giraph.block_app.framework.block.SequenceBlock;
 import org.apache.giraph.block_app.framework.piece.AbstractPiece;
 import org.apache.giraph.block_app.framework.piece.Piece;
@@ -101,6 +102,11 @@ public abstract class MigrationFullBlockFactory
           public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
             consumer.apply(curPiece);
           }
+
+          @Override
+          public PieceCount getPieceCount() {
+            return curPiece.getPieceCount();
+          }
         }
     );
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d4596a02/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockApiHandleTest.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockApiHandleTest.java
b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockApiHandleTest.java
index 328b45d..935f0ee 100644
--- a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockApiHandleTest.java
+++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockApiHandleTest.java
@@ -26,6 +26,7 @@ import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
 import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner;
 import org.apache.giraph.block_app.framework.block.Block;
 import org.apache.giraph.block_app.framework.block.BlockWithApiHandle;
+import org.apache.giraph.block_app.framework.block.PieceCount;
 import org.apache.giraph.block_app.framework.piece.AbstractPiece;
 import org.apache.giraph.block_app.framework.piece.DefaultParentPiece;
 import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
@@ -199,6 +200,11 @@ public class BlockApiHandleTest {
       }
 
       @Override
+      public PieceCount getPieceCount() {
+        return piece.getPieceCount();
+      }
+
+      @Override
       public BlockApiHandle getBlockApiHandle() {
         return handle;
       }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d4596a02/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestIfBlock.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestIfBlock.java
b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestIfBlock.java
index 0dacae1..9a10bf4 100644
--- a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestIfBlock.java
+++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestIfBlock.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 
 import org.apache.giraph.block_app.framework.piece.Piece;
 import org.apache.giraph.function.Supplier;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class TestIfBlock {
@@ -53,6 +54,7 @@ public class TestIfBlock {
     BlockTestingUtils.testIndependence(
         Arrays.asList(piece1, piece2),
         ifBlock);
+    Assert.assertFalse(ifBlock.getPieceCount().isKnown());
   }
 
   @Test
@@ -69,6 +71,7 @@ public class TestIfBlock {
     BlockTestingUtils.testIndependence(
         Arrays.asList(piece1, piece2),
         ifBlock);
+    Assert.assertFalse(ifBlock.getPieceCount().isKnown());
   }
 
   @Test
@@ -83,6 +86,19 @@ public class TestIfBlock {
     BlockTestingUtils.testNestedRepeatBlock(
             Arrays.asList(piece1, piece2),
             ifBlock);
+    Assert.assertFalse(ifBlock.getPieceCount().isKnown());
   }
 
+  @Test
+  public void testIfThenElsePieceCount() {
+    Piece piece1 = new Piece();
+    Piece piece2 = new Piece();
+    Block ifBlock = new IfBlock(
+        TRUE_SUPPLIER,
+        piece1,
+        piece2
+    );
+    Assert.assertTrue(ifBlock.getPieceCount().isKnown());
+    Assert.assertEquals(1, ifBlock.getPieceCount().getCount());
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d4596a02/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatBlock.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatBlock.java
b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatBlock.java
index 1e096ba..10d815d 100644
--- a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatBlock.java
+++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatBlock.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.giraph.block_app.framework.piece.AbstractPiece;
 import org.apache.giraph.block_app.framework.piece.Piece;
+import org.junit.Assert;
 import org.junit.Test;
 
 import com.google.common.collect.Iterables;
@@ -46,6 +47,7 @@ public class TestRepeatBlock {
     BlockTestingUtils.testIndependence(
             Iterables.concat(Collections.nCopies(REPEAT_TIMES, Arrays.asList(piece1, piece2))),
             repeatBlock);
+    Assert.assertEquals(REPEAT_TIMES * 2, repeatBlock.getPieceCount().getCount());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/giraph/blob/d4596a02/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatUntilBlock.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatUntilBlock.java
b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatUntilBlock.java
index f69e4ac..d1befa5 100644
--- a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatUntilBlock.java
+++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatUntilBlock.java
@@ -27,6 +27,7 @@ import org.apache.giraph.block_app.framework.piece.AbstractPiece;
 import org.apache.giraph.block_app.framework.piece.Piece;
 import org.apache.giraph.function.Supplier;
 import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef;
+import org.junit.Assert;
 import org.junit.Test;
 
 import com.google.common.collect.Iterables;
@@ -58,6 +59,8 @@ public class TestRepeatUntilBlock {
     BlockTestingUtils.testIndependence(
       Iterables.concat(Collections.nCopies(REPEAT_TIMES, Arrays.asList(piece1, piece2))),
       repeatBlock);
+    Assert.assertEquals(2, innerBlock.getPieceCount().getCount());
+    Assert.assertFalse(repeatBlock.getPieceCount().isKnown());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/giraph/blob/d4596a02/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 3a3e8dc..63f6aca 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -1261,5 +1261,9 @@ public interface GiraphConstants {
       new LongConfOption("giraph.waitForOtherWorkersMsec",
           HOURS.toMillis(48),
           "How long should workers wait to finish superstep");
+
+  /** Number of supersteps job will run for */
+  IntConfOption SUPERSTEP_COUNT = new IntConfOption("giraph.numSupersteps", -1,
+      "Number of supersteps job will run for");
 }
 // CHECKSTYLE: resume InterfaceIsTypeCheck

http://git-wip-us.apache.org/repos/asf/giraph/blob/d4596a02/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java b/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
index 882c4f4..60951f0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
@@ -20,6 +20,7 @@ package org.apache.giraph.job;
 
 import com.google.common.collect.Iterables;
 import org.apache.giraph.conf.FloatConfOption;
+import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.master.MasterProgress;
 import org.apache.giraph.worker.WorkerProgress;
 import org.apache.giraph.worker.WorkerProgressStats;
@@ -50,6 +51,8 @@ public class CombinedWorkerProgress extends WorkerProgressStats {
    * warning will be printed
    */
   private double normalFreeMemoryFraction;
+  /** Total number of supersteps */
+  private final int superstepCount;
   /**
    * How many workers have reported that they are in highest reported
    * superstep
@@ -86,6 +89,7 @@ public class CombinedWorkerProgress extends WorkerProgressStats {
       MasterProgress masterProgress, Configuration conf) {
     this.masterProgress = masterProgress;
     normalFreeMemoryFraction = NORMAL_FREE_MEMORY_FRACTION.get(conf);
+    superstepCount = GiraphConstants.SUPERSTEP_COUNT.get(conf);
     for (WorkerProgress workerProgress : workerProgresses) {
       if (workerProgress.getCurrentSuperstep() > currentSuperstep) {
         verticesToCompute = 0;
@@ -180,8 +184,12 @@ public class CombinedWorkerProgress extends WorkerProgressStats {
         }
       }
     } else if (isComputeSuperstep()) {
-      sb.append("Compute superstep ").append(currentSuperstep).append(": ");
-      sb.append(verticesComputed).append(" out of ").append(
+      sb.append("Compute superstep ").append(currentSuperstep);
+      if (superstepCount > 0) {
+        // Supersteps are 0..superstepCount-1 so subtract 1 here
+        sb.append(" (out of ").append(superstepCount - 1).append(")");
+      }
+      sb.append(": ").append(verticesComputed).append(" out of ").append(
           verticesToCompute).append(" vertices computed; ");
       sb.append(partitionsComputed).append(" out of ").append(
           partitionsToCompute).append(" partitions computed");


Mime
View raw message