giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject git commit: updated refs/heads/trunk to 218f602
Date Wed, 24 Jul 2013 22:19:48 GMT
Updated Branches:
  refs/heads/trunk 13c3aa116 -> 218f60236


GIRAPH-722: ProgressableUtils.waitForever is not calling progress (majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/218f6023
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/218f6023
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/218f6023

Branch: refs/heads/trunk
Commit: 218f60236ff174852ce91384256d722d2ad48cf8
Parents: 13c3aa1
Author: Maja Kabiljo <majakabiljo@maja-mbp.local>
Authored: Wed Jul 24 15:18:39 2013 -0700
Committer: Maja Kabiljo <majakabiljo@maja-mbp.local>
Committed: Wed Jul 24 15:18:39 2013 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |  2 +
 .../apache/giraph/utils/ProgressableUtils.java  | 39 +++++++++--
 .../giraph/utils/TestProgressableUtils.java     | 70 ++++++++++++++++++++
 3 files changed, 106 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/218f6023/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 6df7ad7..bc3e84d 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-722: ProgressableUtils.waitForever is not calling progress (majakabiljo)
+
   GIRAPH-549: Tinkerpop/Blueprints/Rexter InputFormat (armax00 via claudio)
 
   GIRAPH-701: Communication improvement using one-to-all message

http://git-wip-us.apache.org/repos/asf/giraph/blob/218f6023/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
index 3b06604..78c230a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
@@ -41,7 +41,7 @@ public class ProgressableUtils {
   private static final Logger LOG =
       Logger.getLogger(ProgressableUtils.class);
   /** Msecs to refresh the progress meter (one minute) */
-  private static final int MSEC_PERIOD = 60 * 1000;
+  private static final int DEFUALT_MSEC_PERIOD = 60 * 1000;
 
   /** Do not instantiate. */
   private ProgressableUtils() {
@@ -53,6 +53,20 @@ public class ProgressableUtils {
    *
    * @param executor     Executor which we are waiting for
    * @param progressable Progressable for reporting progress (Job context)
+   * @param msecsPeriod How often to report progress
+   */
+  public static void awaitExecutorTermination(ExecutorService executor,
+      Progressable progressable, int msecsPeriod) {
+    waitForever(new ExecutorServiceWaitable(executor), progressable,
+        msecsPeriod);
+  }
+
+  /**
+   * Wait for executor tasks to terminate, while periodically reporting
+   * progress.
+   *
+   * @param executor     Executor which we are waiting for
+   * @param progressable Progressable for reporting progress (Job context)
    */
   public static void awaitExecutorTermination(ExecutorService executor,
       Progressable progressable) {
@@ -107,8 +121,22 @@ public class ProgressableUtils {
    */
   private static <T> T waitForever(Waitable<T> waitable,
       Progressable progressable) {
+    return waitForever(waitable, progressable, DEFUALT_MSEC_PERIOD);
+  }
+
+  /**
+   * Wait forever for waitable to finish. Periodically reports progress.
+   *
+   * @param waitable Waitable which we wait for
+   * @param progressable Progressable for reporting progress (Job context)
+   * @param msecsPeriod How often to report progress
+   * @param <T> Result type
+   * @return Result of waitable
+   */
+  private static <T> T waitForever(Waitable<T> waitable,
+      Progressable progressable, int msecsPeriod) {
     while (true) {
-      waitFor(waitable, progressable, MSEC_PERIOD);
+      waitFor(waitable, progressable, msecsPeriod, msecsPeriod);
       if (waitable.isFinished()) {
         try {
           return waitable.getResult();
@@ -130,15 +158,17 @@ public class ProgressableUtils {
    * @param waitable Waitable which we wait for
    * @param progressable Progressable for reporting progress (Job context)
    * @param msecs Number of milliseconds to wait for
+   * @param msecsPeriod How often to report progress
    * @param <T> Result type
    * @return Result of waitable
    */
   private static <T> T waitFor(Waitable<T> waitable, Progressable progressable,
-      int msecs) {
+      int msecs, int msecsPeriod) {
     long timeoutTimeMsecs = System.currentTimeMillis() + msecs;
     int currentWaitMsecs;
     while (true) {
-      currentWaitMsecs = Math.min(msecs, MSEC_PERIOD);
+      progressable.progress();
+      currentWaitMsecs = Math.min(msecs, msecsPeriod);
       try {
         waitable.waitFor(currentWaitMsecs);
         if (waitable.isFinished()) {
@@ -157,7 +187,6 @@ public class ProgressableUtils {
       if (System.currentTimeMillis() >= timeoutTimeMsecs) {
         return waitable.getTimeoutResult();
       }
-      progressable.progress();
       msecs = Math.max(0, msecs - currentWaitMsecs);
     }
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/218f6023/giraph-core/src/test/java/org/apache/giraph/utils/TestProgressableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/TestProgressableUtils.java
b/giraph-core/src/test/java/org/apache/giraph/utils/TestProgressableUtils.java
new file mode 100644
index 0000000..c7177a3
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/utils/TestProgressableUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.util.Progressable;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+
+/**
+ * Test ProgressableUtils
+ */
+public class TestProgressableUtils {
+  @Test
+  public void testProgressableUtils() throws NoSuchFieldException,
+      IllegalAccessException {
+    final int sleepTime = 1800;
+    final int msecPeriod = 500;
+    ExecutorService executor = Executors.newFixedThreadPool(1);
+    executor.submit(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(sleepTime);
+        } catch (InterruptedException e) {
+          throw new IllegalStateException();
+        }
+      }
+    });
+    executor.shutdown();
+    CountProgressable countProgressable = new CountProgressable();
+    ProgressableUtils.awaitExecutorTermination(executor, countProgressable,
+        msecPeriod);
+    Assert.assertTrue(countProgressable.counter >= sleepTime / msecPeriod);
+    Assert.assertTrue(
+        countProgressable.counter <= (sleepTime + msecPeriod) / msecPeriod);
+  }
+
+  private static class CountProgressable implements Progressable {
+    private int counter = 0;
+
+    @Override
+    public void progress() {
+      counter++;
+    }
+  }
+}


Mime
View raw message