giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ach...@apache.org
Subject svn commit: r1418483 - in /giraph/trunk: ./ giraph/src/main/java/org/apache/giraph/ giraph/src/main/java/org/apache/giraph/graph/ giraph/src/test/java/org/apache/giraph/
Date Fri, 07 Dec 2012 21:23:40 GMT
Author: aching
Date: Fri Dec  7 21:23:39 2012
New Revision: 1418483

URL: http://svn.apache.org/viewvc?rev=1418483&view=rev
Log:
GIRAPH-446: Add a proper timeout for waiting for workers to join a
superstep. (aching)

Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1418483&r1=1418482&r2=1418483&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Fri Dec  7 21:23:39 2012
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-446: Add a proper timeout for waiting for workers to join a
+  superstep. (aching)
+
   GIRAPH-443: Properly size netty buffers when encoding requests (majakabiljo)
 
   GIRAPH-395: No need to make HashWorkerPartitioner thread-safe. (aching)

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java?rev=1418483&r1=1418482&r2=1418483&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java Fri Dec 
7 21:23:39 2012
@@ -135,11 +135,6 @@ public class GiraphConfiguration extends
   /** Default 100% response rate for workers */
   public static final float MIN_PERCENT_RESPONDED_DEFAULT = 100.0f;
 
-  /** Polling timeout to check on the number of responded tasks (int) */
-  public static final String POLL_MSECS = "giraph.pollMsecs";
-  /** Default poll msecs (30 seconds) */
-  public static final int POLL_MSECS_DEFAULT = 30 * 1000;
-
   /** Enable the Metrics system **/
   public static final String METRICS_ENABLE = "giraph.metrics.enable";
 
@@ -303,12 +298,34 @@ public class GiraphConfiguration extends
   /** Default max resolve address attempts */
   public static final int MAX_RESOLVE_ADDRESS_ATTEMPTS_DEFAULT = 5;
 
-  /** Msecs to wait between waiting for all requests to finish */
+  /** Milliseconds to wait between waiting for all requests to finish */
   public static final String WAITING_REQUEST_MSECS =
       "giraph.waitingRequestMsecs";
-  /** Default msecs to wait between waiting for all requests to finish */
+  /**
+   * Default milliseconds to wait between waiting for all requests to finish
+   */
   public static final int WAITING_REQUEST_MSECS_DEFAULT = 15000;
 
+  /** Millseconds to wait for an event before continuing */
+  public static final String EVENT_WAIT_MSECS = "giraph.eventWaitMsecs";
+  /**
+   * Default milliseconds to wait for an event before continuing (30 seconds)
+   */
+  public static final int EVENT_WAIT_MSECS_DEFAULT = 30 * 1000;
+
+  /**
+   * Maximum milliseconds to wait before giving up trying to get the minimum
+   * number of workers before a superstep (int).
+   */
+  public static final String MAX_MASTER_SUPERSTEP_WAIT_MSECS =
+      "giraph.maxMasterSuperstepWaitMsecs";
+  /**
+   * Default maximum milliseconds to wait before giving up trying to get
+   * the minimum number of workers before a superstep (10 minutes).
+   */
+  public static final int MAX_MASTER_SUPERSTEP_WAIT_MSECS_DEFAULT =
+      10 * 60 * 1000;
+
   /** Milliseconds for a request to complete (or else resend) */
   public static final String MAX_REQUEST_MILLISECONDS =
       "giraph.maxRequestMilliseconds";
@@ -413,11 +430,6 @@ public class GiraphConfiguration extends
   /** Default number of threads for input splits loading */
   public static final int NUM_INPUT_SPLITS_THREADS_DEFAULT = 1;
 
-  /** Number of poll attempts prior to failing the job (int) */
-  public static final String POLL_ATTEMPTS = "giraph.pollAttempts";
-  /** Default poll attempts */
-  public static final int POLL_ATTEMPTS_DEFAULT = 10;
-
   /** Number of minimum vertices in each vertex range */
   public static final String MIN_VERTICES_PER_RANGE =
       "giraph.minVerticesPerRange";
@@ -1170,4 +1182,47 @@ public class GiraphConfiguration extends
   public int getMaxTaskAttempts() {
     return getInt(MAX_TASK_ATTEMPTS, -1);
   }
+
+  /**
+   * Get the number of milliseconds to wait for an event before continuing on
+   *
+   * @return Number of milliseconds to wait for an event before continuing on
+   */
+  public int getEventWaitMsecs() {
+    return getInt(EVENT_WAIT_MSECS, EVENT_WAIT_MSECS_DEFAULT);
+  }
+
+  /**
+   * Set the number of milliseconds to wait for an event before continuing on
+   *
+   * @param eventWaitMsecs Number of milliseconds to wait for an event before
+   *                       continuing on
+   */
+  public void setEventWaitMsecs(int eventWaitMsecs) {
+    setInt(EVENT_WAIT_MSECS, eventWaitMsecs);
+  }
+
+  /**
+   * Get the maximum milliseconds to wait before giving up trying to get the
+   * minimum number of workers before a superstep.
+   *
+   * @return Maximum milliseconds to wait before giving up trying to get the
+   *         minimum number of workers before a superstep
+   */
+  public int getMaxMasterSuperstepWaitMsecs() {
+    return getInt(MAX_MASTER_SUPERSTEP_WAIT_MSECS,
+        MAX_MASTER_SUPERSTEP_WAIT_MSECS_DEFAULT);
+  }
+
+  /**
+   * Set the maximum milliseconds to wait before giving up trying to get the
+   * minimum number of workers before a superstep.
+   *
+   * @param maxMasterSuperstepWaitMsecs Maximum milliseconds to wait before
+   *                                    giving up trying to get the minimum
+   *                                    number of workers before a superstep
+   */
+  public void setMaxMasterSuperstepWaitMsecs(int maxMasterSuperstepWaitMsecs) {
+    setInt(MAX_MASTER_SUPERSTEP_WAIT_MSECS, maxMasterSuperstepWaitMsecs);
+  }
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1418483&r1=1418482&r2=1418483&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Fri Dec
 7 21:23:39 2012
@@ -125,10 +125,10 @@ public class BspServiceMaster<I extends 
   private final int minWorkers;
   /** Min % responded workers */
   private final float minPercentResponded;
-  /** Poll period in msecs */
-  private final int msecsPollPeriod;
-  /** Max number of poll attempts */
-  private final int maxPollAttempts;
+  /** Msecs to wait for an event */
+  private final int eventWaitMsecs;
+  /** Max msecs to wait for a superstep to get enough workers */
+  private final int maxSuperstepWaitMsecs;
   /** Min number of long tails before printing */
   private final int partitionLongTailMinPrint;
   /** Last finalized checkpoint */
@@ -188,10 +188,8 @@ public class BspServiceMaster<I extends 
     minWorkers = conf.getInt(GiraphConfiguration.MIN_WORKERS, -1);
     minPercentResponded = conf.getFloat(
         GiraphConfiguration.MIN_PERCENT_RESPONDED, 100.0f);
-    msecsPollPeriod = conf.getInt(GiraphConfiguration.POLL_MSECS,
-        GiraphConfiguration.POLL_MSECS_DEFAULT);
-    maxPollAttempts = conf.getInt(GiraphConfiguration.POLL_ATTEMPTS,
-        GiraphConfiguration.POLL_ATTEMPTS_DEFAULT);
+    eventWaitMsecs = conf.getEventWaitMsecs();
+    maxSuperstepWaitMsecs = conf.getMaxMasterSuperstepWaitMsecs();
     partitionLongTailMinPrint = conf.getInt(
         GiraphConfiguration.PARTITION_LONG_TAIL_MIN_PRINT,
         GiraphConfiguration.PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT);
@@ -419,11 +417,13 @@ public class BspServiceMaster<I extends 
    */
   private List<WorkerInfo> checkWorkers() {
     boolean failJob = true;
-    int pollAttempt = 0;
+    long failWorkerCheckMsecs =
+        SystemTime.get().getMilliseconds() + maxSuperstepWaitMsecs;
     List<WorkerInfo> healthyWorkerInfoList = new ArrayList<WorkerInfo>();
     List<WorkerInfo> unhealthyWorkerInfoList = new ArrayList<WorkerInfo>();
     int totalResponses = -1;
-    while (pollAttempt < maxPollAttempts) {
+    while (SystemTime.get().getMilliseconds() < failWorkerCheckMsecs) {
+      getContext().progress();
       getAllWorkerInfos(
           getSuperstep(), healthyWorkerInfoList, unhealthyWorkerInfoList);
       totalResponses = healthyWorkerInfoList.size() +
@@ -440,7 +440,7 @@ public class BspServiceMaster<I extends 
           " needed to start superstep " +
           getSuperstep());
       if (getWorkerHealthRegistrationChangedEvent().waitMsecs(
-          msecsPollPeriod)) {
+          eventWaitMsecs)) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("checkWorkers: Got event that health " +
               "registration changed, not using poll attempt");
@@ -452,9 +452,10 @@ public class BspServiceMaster<I extends 
         LOG.info("checkWorkers: Only found " + totalResponses +
             " responses of " + maxWorkers +
             " needed to start superstep " +
-            getSuperstep() + ".  Sleeping for " +
-            msecsPollPeriod + " msecs and used " + pollAttempt +
-            " of " + maxPollAttempts + " attempts.");
+            getSuperstep() + ".  Reporting every" +
+            eventWaitMsecs + " msecs, " +
+            (failWorkerCheckMsecs - SystemTime.get().getMilliseconds()) +
+            " more msecs left before giving up.");
         // Find the missing workers if there are only a few
         if ((maxWorkers - totalResponses) <=
             partitionLongTailMinPrint) {
@@ -462,15 +463,14 @@ public class BspServiceMaster<I extends 
               unhealthyWorkerInfoList);
         }
       }
-      ++pollAttempt;
     }
     if (failJob) {
       LOG.error("checkWorkers: Did not receive enough processes in " +
           "time (only " + totalResponses + " of " +
-          minWorkers + " required).  This occurs if you do not " +
-          "have enough map tasks available simultaneously on " +
-          "your Hadoop instance to fulfill the number of " +
-          "requested workers.");
+          minWorkers + " required) after waiting " + maxSuperstepWaitMsecs +
+          "msecs).  This occurs if you do not have enough map tasks " +
+          "available simultaneously on your Hadoop instance to fulfill " +
+          "the number of requested workers.");
       return null;
     }
 

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1418483&r1=1418482&r2=1418483&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Fri Dec
 7 21:23:39 2012
@@ -348,11 +348,11 @@ public class BspServiceWorker<I extends 
         inputSplitsReadyStat = getZkExt().exists(
             inputSplitPaths.getAllReadyPath(), true);
       } catch (KeeperException e) {
-        throw new IllegalStateException(
-            "setup: KeeperException waiting on input splits", e);
+        throw new IllegalStateException("ensureInputSplitsReady: " +
+            "KeeperException waiting on input splits", e);
       } catch (InterruptedException e) {
-        throw new IllegalStateException(
-            "setup: InterruptedException waiting on input splits", e);
+        throw new IllegalStateException("ensureInputSplitsReady: " +
+            "InterruptedException waiting on input splits", e);
       }
       if (inputSplitsReadyStat != null) {
         break;
@@ -380,12 +380,11 @@ public class BspServiceWorker<I extends 
           CreateMode.PERSISTENT,
           true);
     } catch (KeeperException e) {
-      throw new IllegalStateException(
-          "setup: KeeperException creating worker done splits", e);
+      throw new IllegalStateException("waitForOtherWorkers: " +
+          "KeeperException creating worker done splits", e);
     } catch (InterruptedException e) {
-      throw new IllegalStateException(
-          "setup: InterruptedException creating worker done splits",
-          e);
+      throw new IllegalStateException("waitForOtherWorkers: " +
+          "InterruptedException creating worker done splits", e);
     }
     while (true) {
       Stat inputSplitsDoneStat;
@@ -394,11 +393,11 @@ public class BspServiceWorker<I extends 
             getZkExt().exists(inputSplitPaths.getAllDonePath(),
                 true);
       } catch (KeeperException e) {
-        throw new IllegalStateException(
-            "setup: KeeperException waiting on worker done splits", e);
+        throw new IllegalStateException("waitForOtherWorkers: " +
+            "KeeperException waiting on worker done splits", e);
       } catch (InterruptedException e) {
-        throw new IllegalStateException(
-            "setup: InterruptedException waiting on worker done splits", e);
+        throw new IllegalStateException("waitForOtherWorkers: " +
+            "InterruptedException waiting on worker done splits", e);
       }
       if (inputSplitsDoneStat != null) {
         break;

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java?rev=1418483&r1=1418482&r2=1418483&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java Fri Dec  7 21:23:39 2012
@@ -100,8 +100,8 @@ public class BspCase implements Watcher 
       // Single node testing
       conf.setBoolean(GiraphConfiguration.SPLIT_MASTER_WORKER, false);
     }
-    conf.setInt(GiraphConfiguration.POLL_ATTEMPTS, 10);
-    conf.setInt(GiraphConfiguration.POLL_MSECS, 3 * 1000);
+    conf.setMaxMasterSuperstepWaitMsecs(30 * 1000);
+    conf.setEventWaitMsecs(3 * 1000);
     conf.setInt(GiraphConfiguration.ZOOKEEPER_SERVERLIST_POLL_MSECS, 500);
     if (getZooKeeperList() != null) {
       conf.setZooKeeperConfiguration(getZooKeeperList());

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/TestAutoCheckpoint.java?rev=1418483&r1=1418482&r2=1418483&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/TestAutoCheckpoint.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/TestAutoCheckpoint.java Fri Dec  7
21:23:39 2012
@@ -66,8 +66,9 @@ public class TestAutoCheckpoint extends 
     conf.setBoolean(SimpleCheckpointVertex.ENABLE_FAULT, true);
     conf.setInt("mapred.map.max.attempts", 4);
     // Trigger failure faster
-    conf.setInt("mapred.task.timeout", 30000);
-    conf.setInt(GiraphConfiguration.POLL_MSECS, 5000);
+    conf.setInt("mapred.task.timeout", 10000);
+    conf.setMaxMasterSuperstepWaitMsecs(10000);
+    conf.setEventWaitMsecs(1000);
     conf.setCheckpointFrequency(2);
     conf.set(GiraphConfiguration.CHECKPOINT_DIRECTORY,
         getTempPath("_singleFaultCheckpoints").toString());

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java?rev=1418483&r1=1418482&r2=1418483&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java Fri Dec
 7 21:23:39 2012
@@ -18,10 +18,7 @@
 
 package org.apache.giraph;
 
-import static org.junit.Assert.assertFalse;
-
 import java.io.IOException;
-
 import org.apache.giraph.examples.SimpleCheckpointVertex;
 import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
 import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
@@ -29,44 +26,46 @@ import org.apache.giraph.graph.GiraphJob
 import org.apache.hadoop.fs.Path;
 import org.junit.Test;
 
+import static org.junit.Assert.assertFalse;
+
 /**
  * Unit test for not enough map tasks
  */
 public class TestNotEnoughMapTasks extends BspCase {
 
-    public TestNotEnoughMapTasks() {
-        super(TestNotEnoughMapTasks.class.getName());
-    }
-
-    /**
-     * This job should always fail gracefully with not enough map tasks.
-     *
-     * @throws IOException
-     * @throws ClassNotFoundException
-     * @throws InterruptedException
-     */
-    @Test
-    public void testNotEnoughMapTasks()
-            throws IOException, InterruptedException, ClassNotFoundException {
-        if (!runningInDistributedMode()) {
-            System.out.println(
-                "testNotEnoughMapTasks: Ignore this test in local mode.");
-            return;
-        }
-        Path outputPath = getTempPath(getCallingMethodName());
-        GiraphJob job = prepareJob(getCallingMethodName(),
-            SimpleCheckpointVertex.SimpleCheckpointComputation.class,
-            SimpleSuperstepVertexInputFormat.class,
-            SimpleSuperstepVertexOutputFormat.class, outputPath);
-
-        // An unlikely impossible number of workers to achieve
-        final int unlikelyWorkers = Short.MAX_VALUE;
-        job.getConfiguration().setWorkerConfiguration(unlikelyWorkers, 
-            unlikelyWorkers, 
-            100.0f);
-        // Only one poll attempt of one second to make failure faster
-        job.getConfiguration().setInt(GiraphConfiguration.POLL_ATTEMPTS, 1);
-        job.getConfiguration().setInt(GiraphConfiguration.POLL_MSECS, 1);
-        assertFalse(job.run(false));
+  public TestNotEnoughMapTasks() {
+    super(TestNotEnoughMapTasks.class.getName());
+  }
+
+  /**
+   * This job should always fail gracefully with not enough map tasks.
+   *
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testNotEnoughMapTasks()
+      throws IOException, InterruptedException, ClassNotFoundException {
+    if (!runningInDistributedMode()) {
+      System.out.println(
+          "testNotEnoughMapTasks: Ignore this test in local mode.");
+      return;
     }
+    Path outputPath = getTempPath(getCallingMethodName());
+    GiraphJob job = prepareJob(getCallingMethodName(),
+        SimpleCheckpointVertex.SimpleCheckpointComputation.class,
+        SimpleSuperstepVertexInputFormat.class,
+        SimpleSuperstepVertexOutputFormat.class, outputPath);
+
+    // An unlikely impossible number of workers to achieve
+    final int unlikelyWorkers = Short.MAX_VALUE;
+    job.getConfiguration().setWorkerConfiguration(unlikelyWorkers,
+        unlikelyWorkers,
+        100.0f);
+    // Only one poll attempt of one second to make failure faster
+    job.getConfiguration().setMaxMasterSuperstepWaitMsecs(1000);
+    job.getConfiguration().setEventWaitMsecs(1000);
+    assertFalse(job.run(false));
+  }
 }



Mime
View raw message